You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/17 07:11:31 UTC
[1/3] beam git commit: cleanup BeamSqlRow
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 5fea74638 -> a452b8020
cleanup BeamSqlRow
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bdea7a6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bdea7a6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bdea7a6b
Branch: refs/heads/DSL_SQL
Commit: bdea7a6b62f9d131efca04266b3adbec15d66543
Parents: a976ec0
Author: James Xu <xu...@gmail.com>
Authored: Thu Jul 13 18:42:02 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Fri Jul 14 11:15:53 2017 +0800
----------------------------------------------------------------------
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 185 ++++---------------
1 file changed, 37 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bdea7a6b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index b21a018..082d92a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -19,10 +19,13 @@ package org.apache.beam.dsls.sql.schema;
import java.io.Serializable;
import java.math.BigDecimal;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.GregorianCalendar;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -35,6 +38,25 @@ import org.joda.time.Instant;
*
*/
public class BeamSqlRow implements Serializable {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
private List<Integer> nullFields = new ArrayList<>();
private List<Object> dataValues;
private BeamSqlRecordType dataType;
@@ -82,78 +104,23 @@ public class BeamSqlRow implements Serializable {
}
}
- SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
- switch (fieldType) {
- case INTEGER:
- if (!(fieldValue instanceof Integer)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case SMALLINT:
- if (!(fieldValue instanceof Short)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case TINYINT:
- if (!(fieldValue instanceof Byte)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case DOUBLE:
- if (!(fieldValue instanceof Double)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case BIGINT:
- if (!(fieldValue instanceof Long)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case FLOAT:
- if (!(fieldValue instanceof Float)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case DECIMAL:
- if (!(fieldValue instanceof BigDecimal)) {
- throw new IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case VARCHAR:
- case CHAR:
- if (!(fieldValue instanceof String)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case TIME:
- if (!(fieldValue instanceof GregorianCalendar)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- case TIMESTAMP:
- case DATE:
- if (!(fieldValue instanceof Date)) {
- throw new IllegalArgumentException(
- getTypeMismatchErrorMessage(fieldValue, fieldType));
- }
- break;
- default:
- throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
- }
+ validateValueType(index, fieldValue);
dataValues.set(index, fieldValue);
}
- private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName fieldType) {
- return String.format("[%s](%s) doesn't match type [%s]",
- fieldValue, fieldValue.getClass(), fieldType);
+ private void validateValueType(int index, Object fieldValue) {
+ SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
+ if (javaClazz == null) {
+ throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
}
public Object getFieldValue(String fieldName) {
@@ -205,85 +172,7 @@ public class BeamSqlRow implements Serializable {
return null;
}
- Object fieldValue = dataValues.get(fieldIdx);
- SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, fieldIdx);
-
- switch (fieldType) {
- case INTEGER:
- if (!(fieldValue instanceof Integer)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case SMALLINT:
- if (!(fieldValue instanceof Short)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TINYINT:
- if (!(fieldValue instanceof Byte)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case DOUBLE:
- if (!(fieldValue instanceof Double)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case DECIMAL:
- if (!(fieldValue instanceof BigDecimal)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case BIGINT:
- if (!(fieldValue instanceof Long)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case FLOAT:
- if (!(fieldValue instanceof Float)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case VARCHAR:
- case CHAR:
- if (!(fieldValue instanceof String)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TIME:
- if (!(fieldValue instanceof GregorianCalendar)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TIMESTAMP:
- case DATE:
- if (!(fieldValue instanceof Date)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- default:
- throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
- }
+ return dataValues.get(fieldIdx);
}
public byte getByte(int idx) {
[2/3] beam git commit: [BEAM-2560] Add integration test for
arithmetic operators.
Posted by lz...@apache.org.
[BEAM-2560] Add integration test for arithmetic operators.
And also refactor BeamSqlStringFunctionsIntegrationTest to use ExpressionChecker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a976ec04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a976ec04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a976ec04
Branch: refs/heads/DSL_SQL
Commit: a976ec042d9c2a2925b4b3c9f31261ea7b324f46
Parents: 5fea746
Author: James Xu <xu...@gmail.com>
Authored: Fri Jul 7 11:04:46 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Fri Jul 14 11:15:53 2017 +0800
----------------------------------------------------------------------
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 3 +
.../arithmetic/BeamSqlArithmeticExpression.java | 120 +++++++------
.../arithmetic/BeamSqlDivideExpression.java | 13 +-
.../arithmetic/BeamSqlMinusExpression.java | 10 +-
.../arithmetic/BeamSqlModExpression.java | 12 +-
.../arithmetic/BeamSqlMultiplyExpression.java | 10 +-
.../arithmetic/BeamSqlPlusExpression.java | 10 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 27 +--
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 2 +
.../beam/dsls/sql/utils/CalciteUtils.java | 1 +
.../org/apache/beam/dsls/sql/TestUtils.java | 18 +-
...amSqlArithmeticOperatorsIntegrationTest.java | 162 ++++++++++++++++++
...mSqlBuiltinFunctionsIntegrationTestBase.java | 168 +++++++++++++++++++
.../BeamSqlStringFunctionsIntegrationTest.java | 85 +++-------
.../BeamSqlArithmeticExpressionTest.java | 42 ++---
.../beam/dsls/sql/mock/MockedBoundedTable.java | 10 +-
.../dsls/sql/mock/MockedUnboundedTable.java | 3 +-
17 files changed, 500 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 5d0ce29..de4112d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -165,6 +165,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
case BIGINT:
realValue = rawValue.longValue();
break;
+ case DECIMAL:
+ realValue = rawValue;
+ break;
default:
throw new IllegalStateException("type/realType mismatch: "
+ type + " VS " + realType);
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index f3fd68f..eac4c72 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -18,8 +18,9 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -29,14 +30,53 @@ import org.apache.calcite.sql.type.SqlTypeName;
* Base class for all arithmetic operators.
*/
public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
- private BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+ static {
+ ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+ super(operands, deduceOutputType(operands.get(0).getOutputType(),
+ operands.get(1).getOutputType()));
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
super(operands, outputType);
}
- public BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
- // the outputType can not be determined in constructor
- // will be determined in evaluate() method. ANY here is just a placeholder.
- super(operands, SqlTypeName.ANY);
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+ BigDecimal left = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(0, inputRecord).toString()));
+ BigDecimal right = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(1, inputRecord).toString()));
+
+ BigDecimal result = calc(left, right);
+ return getCorrectlyTypedResult(result);
+ }
+
+ protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+ protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+ int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+ int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+ if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+ && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+ return SqlTypeName.DOUBLE;
+ }
+
+ if (leftIndex < rightIndex) {
+ return right;
+ } else if (leftIndex > rightIndex) {
+ return left;
+ } else {
+ return left;
+ }
}
@Override public boolean accept() {
@@ -52,49 +92,31 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
return true;
}
- /**
- * https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html.
- */
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
- BeamSqlExpression leftOp = operands.get(0);
- BeamSqlExpression rightOp = operands.get(1);
-
- // In the case of -, +, and *, the result is calculated as Long if both
- // operands are INT_TYPES(byte, short, integer, long).
- if (SqlTypeName.INT_TYPES.contains(leftOp.getOutputType())
- && SqlTypeName.INT_TYPES.contains(rightOp.getOutputType())) {
- Long leftValue = Long.valueOf(leftOp.evaluate(inputRecord).getValue().toString());
- Long rightValue = Long.valueOf(rightOp.evaluate(inputRecord).getValue().toString());
- Long ret = calc(leftValue, rightValue);
- return BeamSqlPrimitive.of(SqlTypeName.BIGINT, ret);
- } else {
- // If any of the operands of a +, -, /, *, % is a real
- // OR
- // It is a division calculation
- // we treat them as Double
- double leftValue = getDouble(inputRecord, leftOp);
- double rightValue = getDouble(inputRecord, rightOp);
- return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, calc(leftValue, rightValue));
+ protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+ Number actualValue;
+ switch (outputType) {
+ case TINYINT:
+ actualValue = rawResult.byteValue();
+ break;
+ case SMALLINT:
+ actualValue = rawResult.shortValue();
+ break;
+ case INTEGER:
+ actualValue = rawResult.intValue();
+ break;
+ case BIGINT:
+ actualValue = rawResult.longValue();
+ break;
+ case FLOAT:
+ actualValue = rawResult.floatValue();
+ break;
+ case DOUBLE:
+ actualValue = rawResult.doubleValue();
+ break;
+ case DECIMAL:
+ default:
+ actualValue = rawResult;
}
+ return BeamSqlPrimitive.of(outputType, actualValue);
}
-
- private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
- Object raw = op.evaluate(inputRecord).getValue();
- if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
- return ((Number) raw).doubleValue();
- }
- throw new IllegalStateException(
- String.format("Can't build a valid arithmetic expression with argument %s", raw));
- }
-
- /**
- * For {@link SqlTypeName#INT_TYPES} calculation of '+', '-', '*'.
- */
- public abstract Long calc(Long left, Long right);
-
-
- /**
- * For other {@link SqlTypeName#NUMERIC_TYPES} of '+', '-', '*', '/'.
- */
- public abstract Double calc(Number left, Number right);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
index 907b1fc..db3fac6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
/**
@@ -30,14 +30,7 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
super(operands);
}
- @Override public Long calc(Long left, Long right) {
- return left / right;
- }
-
- @Override public Double calc(Number left, Number right) {
- if (right.doubleValue() == 0) {
- throw new IllegalArgumentException("divisor cannot be 0");
- }
- return left.doubleValue() / right.doubleValue();
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.divide(right);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
index c6d7ca0..fe08870 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
/**
@@ -30,11 +30,7 @@ public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
super(operands);
}
- @Override public Long calc(Long left, Long right) {
- return left - right;
- }
-
- @Override public Double calc(Number left, Number right) {
- return left.doubleValue() - right.doubleValue();
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.subtract(right);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
index 6323e95..11ecf25 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
/**
@@ -27,14 +27,10 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
*/
public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
public BeamSqlModExpression(List<BeamSqlExpression> operands) {
- super(operands);
- }
-
- @Override public Long calc(Long left, Long right) {
- return left % right;
+ super(operands, operands.get(1).getOutputType());
}
- @Override public Double calc(Number left, Number right) {
- return left.doubleValue() % right.doubleValue();
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
index 42ba4a5..e16d3cb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
/**
@@ -30,11 +30,7 @@ public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
super(operands);
}
- @Override public Long calc(Long left, Long right) {
- return left * right;
- }
-
- @Override public Double calc(Number left, Number right) {
- return left.doubleValue() * right.doubleValue();
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.multiply(right);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
index 59be053..5804279 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+import java.math.BigDecimal;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
/**
@@ -30,11 +30,7 @@ public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
super(operands);
}
- @Override public Double calc(Number left, Number right) {
- return left.doubleValue() + right.doubleValue();
- }
-
- @Override public Long calc(Long left, Long right) {
- return left + right;
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.add(right);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index db0ce04..b21a018 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -87,63 +87,62 @@ public class BeamSqlRow implements Serializable {
case INTEGER:
if (!(fieldValue instanceof Integer)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case SMALLINT:
if (!(fieldValue instanceof Short)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case TINYINT:
if (!(fieldValue instanceof Byte)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case DOUBLE:
if (!(fieldValue instanceof Double)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case BIGINT:
if (!(fieldValue instanceof Long)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case FLOAT:
if (!(fieldValue instanceof Float)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case DECIMAL:
if (!(fieldValue instanceof BigDecimal)) {
- throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ throw new IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case VARCHAR:
case CHAR:
if (!(fieldValue instanceof String)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case TIME:
if (!(fieldValue instanceof GregorianCalendar)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
case TIMESTAMP:
case DATE:
if (!(fieldValue instanceof Date)) {
throw new IllegalArgumentException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ getTypeMismatchErrorMessage(fieldValue, fieldType));
}
break;
default:
@@ -152,6 +151,11 @@ public class BeamSqlRow implements Serializable {
dataValues.set(index, fieldValue);
}
+ private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName fieldType) {
+ return String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType);
+ }
+
public Object getFieldValue(String fieldName) {
return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
}
@@ -270,6 +274,7 @@ public class BeamSqlRow implements Serializable {
return fieldValue;
}
case TIMESTAMP:
+ case DATE:
if (!(fieldValue instanceof Date)) {
throw new IllegalArgumentException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index d53ba8d..8be5212 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -91,6 +91,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
case TIME:
longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
break;
+ case DATE:
case TIMESTAMP:
longCoder.encode(value.getDate(idx).getTime(), outStream);
break;
@@ -147,6 +148,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
calendar.setTime(new Date(longCoder.decode(inStream)));
record.addField(idx, calendar);
break;
+ case DATE:
case TIMESTAMP:
record.addField(idx, new Date(longCoder.decode(inStream)));
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index ac395d3..6aa6e62 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -50,6 +50,7 @@ public class CalciteUtils {
JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
+ JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 3294592..8c0a28d 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -19,6 +19,7 @@
package org.apache.beam.dsls.sql;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn;
* Test utilities.
*/
public class TestUtils {
-
/**
* A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
*/
@@ -116,6 +116,16 @@ public class TestUtils {
* <p>Note: check the class javadoc for for detailed example.
*/
public RowsBuilder addRows(final Object... args) {
+ this.rows.addAll(buildRows(type, Arrays.asList(args)));
+ return this;
+ }
+
+ /**
+ * Add rows to the builder.
+ *
+ * <p>Note: check the class javadoc for for detailed example.
+ */
+ public RowsBuilder addRows(final List args) {
this.rows.addAll(buildRows(type, args));
return this;
}
@@ -169,14 +179,14 @@ public class TestUtils {
* )
* }</pre>
*/
- public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... args) {
+ public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) {
List<BeamSqlRow> rows = new ArrayList<>();
int fieldCount = type.size();
- for (int i = 0; i < args.length; i += fieldCount) {
+ for (int i = 0; i < args.size(); i += fieldCount) {
BeamSqlRow row = new BeamSqlRow(type);
for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args[i + j]);
+ row.addField(j, args.get(i + j));
}
rows.add(row);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
new file mode 100644
index 0000000..3d7bf28
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import org.junit.Test;
+
+/**
+ * Integration test for arithmetic operators.
+ */
+public class BeamSqlArithmeticOperatorsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+ private static final BigDecimal ZERO = BigDecimal.valueOf(0.0);
+ private static final BigDecimal ONE0 = BigDecimal.valueOf(1);
+ private static final BigDecimal ONE = BigDecimal.valueOf(1.0);
+ private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0));
+ private static final BigDecimal TWO = BigDecimal.valueOf(2.0);
+
+ @Test
+ public void testPlus() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 + 1", 2)
+ .addExpr("1.0 + 1", TWO)
+ .addExpr("1 + 1.0", TWO)
+ .addExpr("1.0 + 1.0", TWO)
+ .addExpr("c_tinyint + c_tinyint", (byte) 2)
+ .addExpr("c_smallint + c_smallint", (short) 2)
+ .addExpr("c_bigint + c_bigint", 2L)
+ .addExpr("c_decimal + c_decimal", TWO)
+ .addExpr("c_tinyint + c_decimal", TWO)
+ .addExpr("c_float + c_decimal", 2.0)
+ .addExpr("c_double + c_decimal", 2.0)
+ .addExpr("c_float + c_float", 2.0f)
+ .addExpr("c_double + c_float", 2.0)
+ .addExpr("c_double + c_double", 2.0)
+ .addExpr("c_float + c_bigint", 2.0f)
+ .addExpr("c_double + c_bigint", 2.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testPlus_overflow() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_max + c_tinyint_max", -2)
+ .addExpr("c_smallint_max + c_smallint_max", -2)
+ .addExpr("c_integer_max + c_integer_max", -2)
+ // yeah, I know 384L is strange, but since it is already overflowed
+ // what the actualy result is not so important, it is wrong any way.
+ .addExpr("c_bigint_max + c_bigint_max", 384L)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMinus() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 - 1", 0)
+ .addExpr("1.0 - 1", ZERO)
+ .addExpr("1 - 0.0", ONE)
+ .addExpr("1.0 - 1.0", ZERO)
+ .addExpr("c_tinyint - c_tinyint", (byte) 0)
+ .addExpr("c_smallint - c_smallint", (short) 0)
+ .addExpr("c_bigint - c_bigint", 0L)
+ .addExpr("c_decimal - c_decimal", ZERO)
+ .addExpr("c_tinyint - c_decimal", ZERO)
+ .addExpr("c_float - c_decimal", 0.0)
+ .addExpr("c_double - c_decimal", 0.0)
+ .addExpr("c_float - c_float", 0.0f)
+ .addExpr("c_double - c_float", 0.0)
+ .addExpr("c_double - c_double", 0.0)
+ .addExpr("c_float - c_bigint", 0.0f)
+ .addExpr("c_double - c_bigint", 0.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMultiply() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 * 1", 1)
+ .addExpr("1.0 * 1", ONE2)
+ .addExpr("1 * 1.0", ONE2)
+ .addExpr("1.0 * 1.0", ONE2)
+ .addExpr("c_tinyint * c_tinyint", (byte) 1)
+ .addExpr("c_smallint * c_smallint", (short) 1)
+ .addExpr("c_bigint * c_bigint", 1L)
+ .addExpr("c_decimal * c_decimal", ONE2)
+ .addExpr("c_tinyint * c_decimal", ONE2)
+ .addExpr("c_float * c_decimal", 1.0)
+ .addExpr("c_double * c_decimal", 1.0)
+ .addExpr("c_float * c_float", 1.0f)
+ .addExpr("c_double * c_float", 1.0)
+ .addExpr("c_double * c_double", 1.0)
+ .addExpr("c_float * c_bigint", 1.0f)
+ .addExpr("c_double * c_bigint", 1.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testDivide() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 / 1", 1)
+ .addExpr("1.0 / 1", ONE0)
+ .addExpr("1 / 1.0", ONE0)
+ .addExpr("1.0 / 1.0", ONE0)
+ .addExpr("c_tinyint / c_tinyint", (byte) 1)
+ .addExpr("c_smallint / c_smallint", (short) 1)
+ .addExpr("c_bigint / c_bigint", 1L)
+ .addExpr("c_decimal / c_decimal", ONE0)
+ .addExpr("c_tinyint / c_decimal", ONE0)
+ .addExpr("c_float / c_decimal", 1.0)
+ .addExpr("c_double / c_decimal", 1.0)
+ .addExpr("c_float / c_float", 1.0f)
+ .addExpr("c_double / c_float", 1.0)
+ .addExpr("c_double / c_double", 1.0)
+ .addExpr("c_float / c_bigint", 1.0f)
+ .addExpr("c_double / c_bigint", 1.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMod() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("mod(1, 1)", 0)
+ .addExpr("mod(1.0, 1)", 0)
+ .addExpr("mod(1, 1.0)", ZERO)
+ .addExpr("mod(1.0, 1.0)", ZERO)
+ .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0)
+ .addExpr("mod(c_smallint, c_smallint)", (short) 0)
+ .addExpr("mod(c_bigint, c_bigint)", 0L)
+ .addExpr("mod(c_decimal, c_decimal)", ZERO)
+ .addExpr("mod(c_tinyint, c_decimal)", ZERO)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
new file mode 100644
index 0000000..e65e747
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import com.google.common.base.Joiner;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.util.Pair;
+import org.junit.Rule;
+
+/**
+ * Base class for all built-in functions integration tests.
+ */
+public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+ private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
+ static {
+ JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
+ JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
+ JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
+ JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
+ JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
+ }
+
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+
+ protected PCollection<BeamSqlRow> getTestPCollection() {
+ BeamSqlRecordType type = BeamSqlRecordType.create(
+ Arrays.asList("ts", "c_tinyint", "c_smallint",
+ "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
+ "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
+ Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
+ Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
+ Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
+ );
+ try {
+ return MockedBoundedTable
+ .of(type)
+ .addRows(
+ parseDate("1986-02-15 11:35:26"),
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0,
+ BigDecimal.ONE,
+ (byte) 127,
+ (short) 32767,
+ 2147483647,
+ 9223372036854775807L
+ )
+ .buildIOReader(pipeline)
+ .setCoder(new BeamSqlRowCoder(type));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Date parseDate(String str) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ return sdf.parse(str);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Helper class to make write integration test for built-in functions easier.
+ *
+ * <p>example usage:
+ * <pre>{@code
+ * ExpressionChecker checker = new ExpressionChecker()
+ * .addExpr("1 + 1", 2)
+ * .addExpr("1.0 + 1", 2.0)
+ * .addExpr("1 + 1.0", 2.0)
+ * .addExpr("1.0 + 1.0", 2.0)
+ * .addExpr("c_tinyint + c_tinyint", (byte) 2);
+ * checker.buildRunAndCheck(inputCollections);
+ * }</pre>
+ */
+ public class ExpressionChecker {
+ private transient List<Pair<String, Object>> exps = new ArrayList<>();
+
+ public ExpressionChecker addExpr(String expression, Object expectedValue) {
+ exps.add(Pair.of(expression, expectedValue));
+ return this;
+ }
+
+ private String getSql() {
+ List<String> expStrs = new ArrayList<>();
+ for (Pair<String, Object> pair : exps) {
+ expStrs.add(pair.getKey());
+ }
+ return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION";
+ }
+
+ /**
+ * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
+ */
+ public void buildRunAndCheck() {
+ PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+ System.out.println("SQL:>\n" + getSql());
+ try {
+ List<String> names = new ArrayList<>();
+ List<Integer> types = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+
+ for (Pair<String, Object> pair : exps) {
+ names.add(pair.getKey());
+ types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
+ values.add(pair.getValue());
+ }
+
+ PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder
+ .of(BeamSqlRecordType.create(names, types))
+ .addRows(values)
+ .getRows()
+ );
+ inputCollection.getPipeline().run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
index 11465f5..e28581f 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -17,76 +17,35 @@
*/
package org.apache.beam.dsls.sql.integrationtest;
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
import org.junit.Test;
/**
* Integration test for string functions.
*/
-public class BeamSqlStringFunctionsIntegrationTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
+public class BeamSqlStringFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
@Test
public void testStringFunctions() throws Exception {
- String sql = "SELECT "
- + "'hello' || ' world' as concat,"
- + "CHAR_LENGTH('hello') as cl,"
- + "CHARACTER_LENGTH('hello') as cl1,"
- + "UPPER('hello') as up,"
- + "LOWER('HELLO') as lo,"
- + "POSITION('world' IN 'helloworld') as po,"
- + "POSITION('world' IN 'helloworldworld' FROM 7) as po1,"
- + "TRIM(' hello ') as tr,"
- + "TRIM(LEADING ' ' FROM ' hello ') as tr1,"
- + "TRIM(TRAILING ' ' FROM ' hello ') as tr2,"
- + "TRIM(BOTH ' ' FROM ' hello ') as tr3,"
- + "OVERLAY('w3333333rce' PLACING 'resou' FROM 3) as ol,"
- + "SUBSTRING('hello' FROM 2) as ss,"
- + "SUBSTRING('hello' FROM 2 FOR 2) as ss1,"
- + "INITCAP('hello world') as ss1"
- ;
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("'hello' || ' world'", "hello world")
+ .addExpr("CHAR_LENGTH('hello')", 5)
+ .addExpr("CHARACTER_LENGTH('hello')", 5)
+ .addExpr("UPPER('hello')", "HELLO")
+ .addExpr("LOWER('HELLO')", "hello")
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- // 1 -> 5
- Types.VARCHAR, "concat",
- Types.INTEGER, "cl",
- Types.INTEGER, "cl1",
- Types.VARCHAR, "up",
- Types.VARCHAR, "lo",
- // 6 -> 10
- Types.INTEGER, "po",
- Types.INTEGER, "po1",
- Types.VARCHAR, "tr",
- Types.VARCHAR, "tr1",
- Types.VARCHAR, "tr2",
- // 11 -> 15
- Types.VARCHAR, "tr3",
- Types.VARCHAR, "ol",
- Types.VARCHAR, "ss",
- Types.VARCHAR, "ss1",
- Types.VARCHAR, "ic"
- ).addRows(
- // 1 -> 5(lo)
- "hello world", 5, 5, "HELLO", "hello",
- // 6 -> 10()
- 5, 10, "hello", "hello ", " hello",
- // 11 -> 15
- "hello", "w3resou3rce", "ello", "el", "Hello World"
- ).getRows());
- pipeline.run();
- }
+ .addExpr("POSITION('world' IN 'helloworld')", 5)
+ .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+ .addExpr("TRIM(' hello ')", "hello")
+ .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
+ .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
+ .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello")
+ .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce")
+ .addExpr("SUBSTRING('hello' FROM 2)", "ello")
+ .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el")
+ .addExpr("INITCAP('hello world')", "Hello World")
+ ;
+
+ checker.buildRunAndCheck();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
index fc28180..a34f109 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
@@ -82,10 +81,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
@Test public void testPlus() {
List<BeamSqlExpression> operands = new ArrayList<>();
- // integer + integer => long
+ // integer + integer => integer
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+ assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
// integer + long => long
operands.clear();
@@ -99,11 +98,11 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
- // float + long => double
+ // float + long => float
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
- assertEquals(Double.valueOf(Double.valueOf(1.1F) + 1),
+ assertEquals(Float.valueOf(1.1F + 1),
new BeamSqlPlusExpression(operands).evaluate(record).getValue());
// double + long => double
@@ -119,7 +118,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
// integer + integer => long
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+ assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
// integer + long => long
operands.clear();
@@ -137,8 +136,8 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
- assertEquals(Double.valueOf(Double.valueOf(2.1F) - 1),
- new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+ assertEquals(2.1F - 1L,
+ new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
// double + long => double
operands.clear();
@@ -150,10 +149,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
@Test public void testMultiply() {
List<BeamSqlExpression> operands = new ArrayList<>();
- // integer + integer => long
+ // integer + integer => integer
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+ assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
// integer + long => long
operands.clear();
@@ -171,7 +170,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
- assertEquals(Double.valueOf(Double.valueOf(2.1F) * 1),
+ assertEquals(Float.valueOf(2.1F * 1L),
new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
// double + long => double
@@ -184,10 +183,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
@Test public void testDivide() {
List<BeamSqlExpression> operands = new ArrayList<>();
- // integer + integer => long
+ // integer + integer => integer
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+ assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
// integer + long => long
operands.clear();
@@ -205,7 +204,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
operands.clear();
operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
- assertEquals(Double.valueOf(Double.valueOf(2.1F) / 1),
+ assertEquals(2.1F / 1,
new BeamSqlDivideExpression(operands).evaluate(record).getValue());
// double + long => double
@@ -221,7 +220,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
// integer + integer => long
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
- assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+ assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
// integer + long => long
operands.clear();
@@ -234,18 +233,5 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
-
- // float + long => double
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 3.1F));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
- assertEquals(Double.valueOf(Double.valueOf(3.1F) % 2),
- new BeamSqlModExpression(operands).evaluate(record).getValue());
-
- // double + long => double
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
- assertEquals(1.1, new BeamSqlModExpression(operands).evaluate(record).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
index 0fb8a80..84f49a9 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -21,6 +21,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
import static org.apache.beam.dsls.sql.TestUtils.buildRows;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.dsls.sql.schema.BeamIOType;
@@ -65,6 +66,13 @@ public class MockedBoundedTable extends MockedTable {
return new MockedBoundedTable(buildBeamSqlRecordType(args));
}
+ /**
+ * Build a mocked bounded table with the specified type.
+ */
+ public static MockedBoundedTable of(final BeamSqlRecordType type) {
+ return new MockedBoundedTable(type);
+ }
+
/**
* Add rows to the builder.
@@ -80,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
* }</pre>
*/
public MockedBoundedTable addRows(Object... args) {
- List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+ List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
this.rows.addAll(rows);
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
index 12d8d37..0f8c912 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -22,6 +22,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
import static org.apache.beam.dsls.sql.TestUtils.buildRows;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.schema.BeamIOType;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
@@ -84,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable {
* }</pre>
*/
public MockedUnboundedTable addRows(Duration duration, Object... args) {
- List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+ List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
// record the watermark + rows
this.timestampedRows.add(Pair.of(duration, rows));
return this;
[3/3] beam git commit: This closes #3557
Posted by lz...@apache.org.
This closes #3557
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a452b802
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a452b802
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a452b802
Branch: refs/heads/DSL_SQL
Commit: a452b802008c44083afbbfaf79f65f20b5dcd03b
Parents: 5fea746 bdea7a6
Author: JingsongLi <lz...@aliyun.com>
Authored: Mon Jul 17 15:11:02 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Mon Jul 17 15:11:02 2017 +0800
----------------------------------------------------------------------
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 3 +
.../arithmetic/BeamSqlArithmeticExpression.java | 120 +++++++-----
.../arithmetic/BeamSqlDivideExpression.java | 13 +-
.../arithmetic/BeamSqlMinusExpression.java | 10 +-
.../arithmetic/BeamSqlModExpression.java | 12 +-
.../arithmetic/BeamSqlMultiplyExpression.java | 10 +-
.../arithmetic/BeamSqlPlusExpression.java | 10 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 182 ++++---------------
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 2 +
.../beam/dsls/sql/utils/CalciteUtils.java | 1 +
.../org/apache/beam/dsls/sql/TestUtils.java | 18 +-
...amSqlArithmeticOperatorsIntegrationTest.java | 162 +++++++++++++++++
...mSqlBuiltinFunctionsIntegrationTestBase.java | 168 +++++++++++++++++
.../BeamSqlStringFunctionsIntegrationTest.java | 85 +++------
.../BeamSqlArithmeticExpressionTest.java | 42 ++---
.../beam/dsls/sql/mock/MockedBoundedTable.java | 10 +-
.../dsls/sql/mock/MockedUnboundedTable.java | 3 +-
17 files changed, 522 insertions(+), 329 deletions(-)
----------------------------------------------------------------------