You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/17 07:11:32 UTC

[2/3] beam git commit: [BEAM-2560] Add integration test for arithmetic operators.

[BEAM-2560] Add integration test for arithmetic operators.

And also refactor BeamSqlStringFunctionsIntegrationTest to use ExpressionChecker


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a976ec04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a976ec04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a976ec04

Branch: refs/heads/DSL_SQL
Commit: a976ec042d9c2a2925b4b3c9f31261ea7b324f46
Parents: 5fea746
Author: James Xu <xu...@gmail.com>
Authored: Fri Jul 7 11:04:46 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Fri Jul 14 11:15:53 2017 +0800

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |   3 +
 .../arithmetic/BeamSqlArithmeticExpression.java | 120 +++++++------
 .../arithmetic/BeamSqlDivideExpression.java     |  13 +-
 .../arithmetic/BeamSqlMinusExpression.java      |  10 +-
 .../arithmetic/BeamSqlModExpression.java        |  12 +-
 .../arithmetic/BeamSqlMultiplyExpression.java   |  10 +-
 .../arithmetic/BeamSqlPlusExpression.java       |  10 +-
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |  27 +--
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |   2 +
 .../beam/dsls/sql/utils/CalciteUtils.java       |   1 +
 .../org/apache/beam/dsls/sql/TestUtils.java     |  18 +-
 ...amSqlArithmeticOperatorsIntegrationTest.java | 162 ++++++++++++++++++
 ...mSqlBuiltinFunctionsIntegrationTestBase.java | 168 +++++++++++++++++++
 .../BeamSqlStringFunctionsIntegrationTest.java  |  85 +++-------
 .../BeamSqlArithmeticExpressionTest.java        |  42 ++---
 .../beam/dsls/sql/mock/MockedBoundedTable.java  |  10 +-
 .../dsls/sql/mock/MockedUnboundedTable.java     |   3 +-
 17 files changed, 500 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 5d0ce29..de4112d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -165,6 +165,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
             case BIGINT:
               realValue = rawValue.longValue();
               break;
+            case DECIMAL:
+              realValue = rawValue;
+              break;
             default:
               throw new IllegalStateException("type/realType mismatch: "
                   + type + " VS " + realType);

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index f3fd68f..eac4c72 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -18,8 +18,9 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -29,14 +30,53 @@ import org.apache.calcite.sql.type.SqlTypeName;
  * Base class for all arithmetic operators.
  */
 public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
-  private BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+  private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+  static {
+    ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+    super(operands, deduceOutputType(operands.get(0).getOutputType(),
+        operands.get(1).getOutputType()));
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
     super(operands, outputType);
   }
 
-  public BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
-    // the outputType can not be determined in constructor
-    // will be determined in evaluate() method. ANY here is just a placeholder.
-    super(operands, SqlTypeName.ANY);
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+    BigDecimal left = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(0, inputRecord).toString()));
+    BigDecimal right = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(1, inputRecord).toString()));
+
+    BigDecimal result = calc(left, right);
+    return getCorrectlyTypedResult(result);
+  }
+
+  protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+  protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+    int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+    int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+    if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+        && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+      return SqlTypeName.DOUBLE;
+    }
+
+    if (leftIndex < rightIndex) {
+      return right;
+    } else if (leftIndex > rightIndex) {
+      return left;
+    } else {
+      return left;
+    }
   }
 
   @Override public boolean accept() {
@@ -52,49 +92,31 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
     return true;
   }
 
-  /**
-   * https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html.
-   */
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
-    BeamSqlExpression leftOp = operands.get(0);
-    BeamSqlExpression rightOp = operands.get(1);
-
-    // In the case of -, +, and *, the result is calculated as Long if both
-    // operands are INT_TYPES(byte, short, integer, long).
-    if (SqlTypeName.INT_TYPES.contains(leftOp.getOutputType())
-        && SqlTypeName.INT_TYPES.contains(rightOp.getOutputType())) {
-      Long leftValue = Long.valueOf(leftOp.evaluate(inputRecord).getValue().toString());
-      Long rightValue = Long.valueOf(rightOp.evaluate(inputRecord).getValue().toString());
-      Long ret = calc(leftValue, rightValue);
-      return BeamSqlPrimitive.of(SqlTypeName.BIGINT, ret);
-    } else {
-      // If any of the operands of a +, -, /, *, % is a real
-      //  OR
-      // It is a division calculation
-      // we treat them as Double
-      double leftValue = getDouble(inputRecord, leftOp);
-      double rightValue = getDouble(inputRecord, rightOp);
-      return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, calc(leftValue, rightValue));
+  protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+    Number actualValue;
+    switch (outputType) {
+      case TINYINT:
+        actualValue = rawResult.byteValue();
+        break;
+      case SMALLINT:
+        actualValue = rawResult.shortValue();
+        break;
+      case INTEGER:
+        actualValue = rawResult.intValue();
+        break;
+      case BIGINT:
+        actualValue = rawResult.longValue();
+        break;
+      case FLOAT:
+        actualValue = rawResult.floatValue();
+        break;
+      case DOUBLE:
+        actualValue = rawResult.doubleValue();
+        break;
+      case DECIMAL:
+      default:
+        actualValue = rawResult;
     }
+    return BeamSqlPrimitive.of(outputType, actualValue);
   }
-
-  private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
-    Object raw = op.evaluate(inputRecord).getValue();
-    if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
-      return ((Number) raw).doubleValue();
-    }
-    throw new IllegalStateException(
-        String.format("Can't build a valid arithmetic expression with argument %s", raw));
-  }
-
-  /**
-   * For {@link SqlTypeName#INT_TYPES} calculation of '+', '-', '*'.
-   */
-  public abstract Long calc(Long left, Long right);
-
-
-  /**
-   * For other {@link SqlTypeName#NUMERIC_TYPES} of '+', '-', '*', '/'.
-   */
-  public abstract Double calc(Number left, Number right);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
index 907b1fc..db3fac6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 
 /**
@@ -30,14 +30,7 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
     super(operands);
   }
 
-  @Override public Long calc(Long left, Long right) {
-    return left / right;
-  }
-
-  @Override public Double calc(Number left, Number right) {
-    if (right.doubleValue() == 0) {
-      throw new IllegalArgumentException("divisor cannot be 0");
-    }
-    return left.doubleValue() / right.doubleValue();
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.divide(right);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
index c6d7ca0..fe08870 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 
 /**
@@ -30,11 +30,7 @@ public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
     super(operands);
   }
 
-  @Override public Long calc(Long left, Long right) {
-    return left - right;
-  }
-
-  @Override public Double calc(Number left, Number right) {
-    return left.doubleValue() - right.doubleValue();
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.subtract(right);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
index 6323e95..11ecf25 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 
 /**
@@ -27,14 +27,10 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
  */
 public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
   public BeamSqlModExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override public Long calc(Long left, Long right) {
-    return left % right;
+    super(operands, operands.get(1).getOutputType());
   }
 
-  @Override public Double calc(Number left, Number right) {
-    return left.doubleValue() % right.doubleValue();
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
index 42ba4a5..e16d3cb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 
 /**
@@ -30,11 +30,7 @@ public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
     super(operands);
   }
 
-  @Override public Long calc(Long left, Long right) {
-    return left * right;
-  }
-
-  @Override public Double calc(Number left, Number right) {
-    return left.doubleValue() * right.doubleValue();
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.multiply(right);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
index 59be053..5804279 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
 
+import java.math.BigDecimal;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 
 /**
@@ -30,11 +30,7 @@ public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
     super(operands);
   }
 
-  @Override public Double calc(Number left, Number right) {
-    return left.doubleValue() + right.doubleValue();
-  }
-
-  @Override public Long calc(Long left, Long right) {
-    return left + right;
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.add(right);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index db0ce04..b21a018 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -87,63 +87,62 @@ public class BeamSqlRow implements Serializable {
       case INTEGER:
         if (!(fieldValue instanceof Integer)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case SMALLINT:
         if (!(fieldValue instanceof Short)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case TINYINT:
         if (!(fieldValue instanceof Byte)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case DOUBLE:
         if (!(fieldValue instanceof Double)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case BIGINT:
         if (!(fieldValue instanceof Long)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case FLOAT:
         if (!(fieldValue instanceof Float)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case DECIMAL:
         if (!(fieldValue instanceof BigDecimal)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+          throw new IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case VARCHAR:
       case CHAR:
         if (!(fieldValue instanceof String)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case TIME:
         if (!(fieldValue instanceof GregorianCalendar)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       case TIMESTAMP:
       case DATE:
         if (!(fieldValue instanceof Date)) {
           throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+              getTypeMismatchErrorMessage(fieldValue, fieldType));
         }
         break;
       default:
@@ -152,6 +151,11 @@ public class BeamSqlRow implements Serializable {
     dataValues.set(index, fieldValue);
   }
 
+  private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName fieldType) {
+    return String.format("[%s](%s) doesn't match type [%s]",
+        fieldValue, fieldValue.getClass(), fieldType);
+  }
+
   public Object getFieldValue(String fieldName) {
     return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
   }
@@ -270,6 +274,7 @@ public class BeamSqlRow implements Serializable {
           return fieldValue;
         }
       case TIMESTAMP:
+      case DATE:
         if (!(fieldValue instanceof Date)) {
           throw new IllegalArgumentException(
               String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index d53ba8d..8be5212 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -91,6 +91,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
         case TIME:
           longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
           break;
+        case DATE:
         case TIMESTAMP:
           longCoder.encode(value.getDate(idx).getTime(), outStream);
           break;
@@ -147,6 +148,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
           calendar.setTime(new Date(longCoder.decode(inStream)));
           record.addField(idx, calendar);
           break;
+        case DATE:
         case TIMESTAMP:
           record.addField(idx, new Date(longCoder.decode(inStream)));
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index ac395d3..6aa6e62 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -50,6 +50,7 @@ public class CalciteUtils {
     JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
     JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
 
+    JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
     JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
     JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 3294592..8c0a28d 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.beam.dsls.sql;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn;
  * Test utilities.
  */
 public class TestUtils {
-
   /**
    * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
    */
@@ -116,6 +116,16 @@ public class TestUtils {
      * <p>Note: check the class javadoc for for detailed example.
      */
     public RowsBuilder addRows(final Object... args) {
+      this.rows.addAll(buildRows(type, Arrays.asList(args)));
+      return this;
+    }
+
+    /**
+     * Add rows to the builder.
+     *
+     * <p>Note: check the class javadoc for for detailed example.
+     */
+    public RowsBuilder addRows(final List args) {
       this.rows.addAll(buildRows(type, args));
       return this;
     }
@@ -169,14 +179,14 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... args) {
+  public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) {
     List<BeamSqlRow> rows = new ArrayList<>();
     int fieldCount = type.size();
 
-    for (int i = 0; i < args.length; i += fieldCount) {
+    for (int i = 0; i < args.size(); i += fieldCount) {
       BeamSqlRow row = new BeamSqlRow(type);
       for (int j = 0; j < fieldCount; j++) {
-        row.addField(j, args[i + j]);
+        row.addField(j, args.get(i + j));
       }
       rows.add(row);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
new file mode 100644
index 0000000..3d7bf28
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import org.junit.Test;
+
+/**
+ * Integration test for arithmetic operators.
+ */
+public class BeamSqlArithmeticOperatorsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+  private static final BigDecimal ZERO = BigDecimal.valueOf(0.0);
+  private static final BigDecimal ONE0 = BigDecimal.valueOf(1);
+  private static final BigDecimal ONE = BigDecimal.valueOf(1.0);
+  private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0));
+  private static final BigDecimal TWO = BigDecimal.valueOf(2.0);
+
+  @Test
+  public void testPlus() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 + 1", 2)
+        .addExpr("1.0 + 1", TWO)
+        .addExpr("1 + 1.0", TWO)
+        .addExpr("1.0 + 1.0", TWO)
+        .addExpr("c_tinyint + c_tinyint", (byte) 2)
+        .addExpr("c_smallint + c_smallint", (short) 2)
+        .addExpr("c_bigint + c_bigint", 2L)
+        .addExpr("c_decimal + c_decimal", TWO)
+        .addExpr("c_tinyint + c_decimal", TWO)
+        .addExpr("c_float + c_decimal", 2.0)
+        .addExpr("c_double + c_decimal", 2.0)
+        .addExpr("c_float + c_float", 2.0f)
+        .addExpr("c_double + c_float", 2.0)
+        .addExpr("c_double + c_double", 2.0)
+        .addExpr("c_float + c_bigint", 2.0f)
+        .addExpr("c_double + c_bigint", 2.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPlus_overflow() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_max + c_tinyint_max", -2)
+        .addExpr("c_smallint_max + c_smallint_max", -2)
+        .addExpr("c_integer_max + c_integer_max", -2)
+        // yeah, I know 384L is strange, but since it is already overflowed
+        // what the actualy result is not so important, it is wrong any way.
+        .addExpr("c_bigint_max + c_bigint_max", 384L)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMinus() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 - 1", 0)
+        .addExpr("1.0 - 1", ZERO)
+        .addExpr("1 - 0.0", ONE)
+        .addExpr("1.0 - 1.0", ZERO)
+        .addExpr("c_tinyint - c_tinyint", (byte) 0)
+        .addExpr("c_smallint - c_smallint", (short) 0)
+        .addExpr("c_bigint - c_bigint", 0L)
+        .addExpr("c_decimal - c_decimal", ZERO)
+        .addExpr("c_tinyint - c_decimal", ZERO)
+        .addExpr("c_float - c_decimal", 0.0)
+        .addExpr("c_double - c_decimal", 0.0)
+        .addExpr("c_float - c_float", 0.0f)
+        .addExpr("c_double - c_float", 0.0)
+        .addExpr("c_double - c_double", 0.0)
+        .addExpr("c_float - c_bigint", 0.0f)
+        .addExpr("c_double - c_bigint", 0.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMultiply() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 * 1", 1)
+        .addExpr("1.0 * 1", ONE2)
+        .addExpr("1 * 1.0", ONE2)
+        .addExpr("1.0 * 1.0", ONE2)
+        .addExpr("c_tinyint * c_tinyint", (byte) 1)
+        .addExpr("c_smallint * c_smallint", (short) 1)
+        .addExpr("c_bigint * c_bigint", 1L)
+        .addExpr("c_decimal * c_decimal", ONE2)
+        .addExpr("c_tinyint * c_decimal", ONE2)
+        .addExpr("c_float * c_decimal", 1.0)
+        .addExpr("c_double * c_decimal", 1.0)
+        .addExpr("c_float * c_float", 1.0f)
+        .addExpr("c_double * c_float", 1.0)
+        .addExpr("c_double * c_double", 1.0)
+        .addExpr("c_float * c_bigint", 1.0f)
+        .addExpr("c_double * c_bigint", 1.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testDivide() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 / 1", 1)
+        .addExpr("1.0 / 1", ONE0)
+        .addExpr("1 / 1.0", ONE0)
+        .addExpr("1.0 / 1.0", ONE0)
+        .addExpr("c_tinyint / c_tinyint", (byte) 1)
+        .addExpr("c_smallint / c_smallint", (short) 1)
+        .addExpr("c_bigint / c_bigint", 1L)
+        .addExpr("c_decimal / c_decimal", ONE0)
+        .addExpr("c_tinyint / c_decimal", ONE0)
+        .addExpr("c_float / c_decimal", 1.0)
+        .addExpr("c_double / c_decimal", 1.0)
+        .addExpr("c_float / c_float", 1.0f)
+        .addExpr("c_double / c_float", 1.0)
+        .addExpr("c_double / c_double", 1.0)
+        .addExpr("c_float / c_bigint", 1.0f)
+        .addExpr("c_double / c_bigint", 1.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMod() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("mod(1, 1)", 0)
+        .addExpr("mod(1.0, 1)", 0)
+        .addExpr("mod(1, 1.0)", ZERO)
+        .addExpr("mod(1.0, 1.0)", ZERO)
+        .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0)
+        .addExpr("mod(c_smallint, c_smallint)", (short) 0)
+        .addExpr("mod(c_bigint, c_bigint)", 0L)
+        .addExpr("mod(c_decimal, c_decimal)", ZERO)
+        .addExpr("mod(c_tinyint, c_decimal)", ZERO)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
new file mode 100644
index 0000000..e65e747
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import com.google.common.base.Joiner;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.util.Pair;
+import org.junit.Rule;
+
+/**
+ * Base class for all built-in functions integration tests.
+ */
+public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+  private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
+  static {
+    JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
+    JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
+    JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
+    JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
+    JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
+  }
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  protected PCollection<BeamSqlRow> getTestPCollection() {
+    BeamSqlRecordType type = BeamSqlRecordType.create(
+        Arrays.asList("ts", "c_tinyint", "c_smallint",
+            "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
+            "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
+        Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
+            Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
+            Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
+    );
+    try {
+      return MockedBoundedTable
+          .of(type)
+          .addRows(
+              parseDate("1986-02-15 11:35:26"),
+              (byte) 1,
+              (short) 1,
+              1,
+              1L,
+              1.0f,
+              1.0,
+              BigDecimal.ONE,
+              (byte) 127,
+              (short) 32767,
+              2147483647,
+              9223372036854775807L
+          )
+          .buildIOReader(pipeline)
+          .setCoder(new BeamSqlRowCoder(type));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static Date parseDate(String str) {
+    try {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+      return sdf.parse(str);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * Helper class to make write integration test for built-in functions easier.
+   *
+   * <p>example usage:
+   * <pre>{@code
+   * ExpressionChecker checker = new ExpressionChecker()
+   *   .addExpr("1 + 1", 2)
+   *   .addExpr("1.0 + 1", 2.0)
+   *   .addExpr("1 + 1.0", 2.0)
+   *   .addExpr("1.0 + 1.0", 2.0)
+   *   .addExpr("c_tinyint + c_tinyint", (byte) 2);
+   * checker.buildRunAndCheck(inputCollections);
+   * }</pre>
+   */
+  public class ExpressionChecker {
+    private transient List<Pair<String, Object>> exps = new ArrayList<>();
+
+    public ExpressionChecker addExpr(String expression, Object expectedValue) {
+      exps.add(Pair.of(expression, expectedValue));
+      return this;
+    }
+
+    private String getSql() {
+      List<String> expStrs = new ArrayList<>();
+      for (Pair<String, Object> pair : exps) {
+        expStrs.add(pair.getKey());
+      }
+      return "SELECT " + Joiner.on(",\n  ").join(expStrs) + " FROM PCOLLECTION";
+    }
+
+    /**
+     * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
+     */
+    public void buildRunAndCheck() {
+      PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+      System.out.println("SQL:>\n" + getSql());
+      try {
+        List<String> names = new ArrayList<>();
+        List<Integer> types = new ArrayList<>();
+        List<Object> values = new ArrayList<>();
+
+        for (Pair<String, Object> pair : exps) {
+          names.add(pair.getKey());
+          types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
+          values.add(pair.getValue());
+        }
+
+        PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+        PAssert.that(rows).containsInAnyOrder(
+            TestUtils.RowsBuilder
+                .of(BeamSqlRecordType.create(names, types))
+                .addRows(values)
+                .getRows()
+        );
+        inputCollection.getPipeline().run();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
index 11465f5..e28581f 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -17,76 +17,35 @@
  */
 package org.apache.beam.dsls.sql.integrationtest;
 
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Integration test for string functions.
  */
-public class BeamSqlStringFunctionsIntegrationTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
+public class BeamSqlStringFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
   @Test
   public void testStringFunctions() throws Exception {
-    String sql = "SELECT "
-        + "'hello' || ' world' as concat,"
-        + "CHAR_LENGTH('hello') as cl,"
-        + "CHARACTER_LENGTH('hello') as cl1,"
-        + "UPPER('hello') as up,"
-        + "LOWER('HELLO') as lo,"
-        + "POSITION('world' IN 'helloworld') as po,"
-        + "POSITION('world' IN 'helloworldworld' FROM 7) as po1,"
-        + "TRIM(' hello ') as tr,"
-        + "TRIM(LEADING ' ' FROM ' hello ') as tr1,"
-        + "TRIM(TRAILING ' ' FROM ' hello ') as tr2,"
-        + "TRIM(BOTH ' ' FROM ' hello ') as tr3,"
-        + "OVERLAY('w3333333rce' PLACING 'resou' FROM 3) as ol,"
-        + "SUBSTRING('hello' FROM 2) as ss,"
-        + "SUBSTRING('hello' FROM 2 FOR 2) as ss1,"
-        + "INITCAP('hello world') as ss1"
-    ;
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("'hello' || ' world'", "hello world")
+        .addExpr("CHAR_LENGTH('hello')", 5)
+        .addExpr("CHARACTER_LENGTH('hello')", 5)
+        .addExpr("UPPER('hello')", "HELLO")
+        .addExpr("LOWER('HELLO')", "hello")
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            // 1 -> 5
-            Types.VARCHAR, "concat",
-            Types.INTEGER, "cl",
-            Types.INTEGER, "cl1",
-            Types.VARCHAR, "up",
-            Types.VARCHAR, "lo",
-            // 6 -> 10
-            Types.INTEGER, "po",
-            Types.INTEGER, "po1",
-            Types.VARCHAR, "tr",
-            Types.VARCHAR, "tr1",
-            Types.VARCHAR, "tr2",
-            // 11 -> 15
-            Types.VARCHAR, "tr3",
-            Types.VARCHAR, "ol",
-            Types.VARCHAR, "ss",
-            Types.VARCHAR, "ss1",
-            Types.VARCHAR, "ic"
-        ).addRows(
-            // 1 -> 5(lo)
-            "hello world", 5, 5, "HELLO", "hello",
-            // 6 -> 10()
-            5, 10, "hello", "hello ", " hello",
-            // 11 -> 15
-            "hello", "w3resou3rce", "ello", "el", "Hello World"
-        ).getRows());
-    pipeline.run();
-  }
+        .addExpr("POSITION('world' IN 'helloworld')", 5)
+        .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+        .addExpr("TRIM(' hello ')", "hello")
+        .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
+        .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
 
+        .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello")
+        .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce")
+        .addExpr("SUBSTRING('hello' FROM 2)", "ello")
+        .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el")
+        .addExpr("INITCAP('hello world')", "Hello World")
+        ;
+
+    checker.buildRunAndCheck();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
index fc28180..a34f109 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
@@ -82,10 +81,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test public void testPlus() {
     List<BeamSqlExpression> operands = new ArrayList<>();
 
-    // integer + integer => long
+    // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
 
     // integer + long => long
     operands.clear();
@@ -99,11 +98,11 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
 
-    // float + long => double
+    // float + long => float
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(Double.valueOf(Double.valueOf(1.1F) + 1),
+    assertEquals(Float.valueOf(1.1F + 1),
         new BeamSqlPlusExpression(operands).evaluate(record).getValue());
 
     // double + long => double
@@ -119,7 +118,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
 
     // integer + long => long
     operands.clear();
@@ -137,8 +136,8 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(Double.valueOf(Double.valueOf(2.1F) - 1),
-        new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1F - 1L,
+        new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
 
     // double + long => double
     operands.clear();
@@ -150,10 +149,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test public void testMultiply() {
     List<BeamSqlExpression> operands = new ArrayList<>();
 
-    // integer + integer => long
+    // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
 
     // integer + long => long
     operands.clear();
@@ -171,7 +170,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(Double.valueOf(Double.valueOf(2.1F) * 1),
+    assertEquals(Float.valueOf(2.1F * 1L),
         new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
 
     // double + long => double
@@ -184,10 +183,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test public void testDivide() {
     List<BeamSqlExpression> operands = new ArrayList<>();
 
-    // integer + integer => long
+    // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
 
     // integer + long => long
     operands.clear();
@@ -205,7 +204,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(Double.valueOf(Double.valueOf(2.1F) / 1),
+    assertEquals(2.1F / 1,
         new BeamSqlDivideExpression(operands).evaluate(record).getValue());
 
     // double + long => double
@@ -221,7 +220,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
 
     // integer + long => long
     operands.clear();
@@ -234,18 +233,5 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
-
-    // float + long => double
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 3.1F));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(Double.valueOf(Double.valueOf(3.1F) % 2),
-        new BeamSqlModExpression(operands).evaluate(record).getValue());
-
-    // double + long => double
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(1.1, new BeamSqlModExpression(operands).evaluate(record).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
index 0fb8a80..84f49a9 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -21,6 +21,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
@@ -65,6 +66,13 @@ public class MockedBoundedTable extends MockedTable {
     return new MockedBoundedTable(buildBeamSqlRecordType(args));
   }
 
+  /**
+   * Build a mocked bounded table with the specified type.
+   */
+  public static MockedBoundedTable of(final BeamSqlRecordType type) {
+    return new MockedBoundedTable(type);
+  }
+
 
   /**
    * Add rows to the builder.
@@ -80,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public MockedBoundedTable addRows(Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
     this.rows.addAll(rows);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
index 12d8d37..0f8c912 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -22,6 +22,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
@@ -84,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public MockedUnboundedTable addRows(Duration duration, Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
     // record the watermark + rows
     this.timestampedRows.add(Pair.of(duration, rows));
     return this;