You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:27 UTC

[04/59] beam git commit: move dsls/sql to sdks/java/extensions/sql

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
new file mode 100644
index 0000000..bd0d3ba
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.Iterator;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for date functions.
+ */
+public class BeamSqlDateFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test public void testDateTimeFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
+        .addExpr("YEAR(ts)", 1986L)
+        .addExpr("QUARTER(ts)", 1L)
+        .addExpr("MONTH(ts)", 2L)
+        .addExpr("WEEK(ts)", 7L)
+        .addExpr("DAYOFMONTH(ts)", 15L)
+        .addExpr("DAYOFYEAR(ts)", 46L)
+        .addExpr("DAYOFWEEK(ts)", 7L)
+        .addExpr("HOUR(ts)", 11L)
+        .addExpr("MINUTE(ts)", 35L)
+        .addExpr("SECOND(ts)", 26L)
+        .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
+        .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"))
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test public void testDateTimeFunctions_currentTime() throws Exception {
+    String sql = "SELECT "
+        + "LOCALTIME as l,"
+        + "LOCALTIMESTAMP as l1,"
+        + "CURRENT_DATE as c1,"
+        + "CURRENT_TIME as c2,"
+        + "CURRENT_TIMESTAMP as c3"
+        + " FROM PCOLLECTION"
+        ;
+    PCollection<BeamSqlRow> rows = getTestPCollection().apply(
+        BeamSql.simpleQuery(sql));
+    PAssert.that(rows).satisfies(new Checker());
+    pipeline.run();
+  }
+
+  private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+    @Override public Void apply(Iterable<BeamSqlRow> input) {
+      Iterator<BeamSqlRow> iter = input.iterator();
+      assertTrue(iter.hasNext());
+      BeamSqlRow row = iter.next();
+        // LOCALTIME
+      Date date = new Date();
+      assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(1).getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(2).getTime() < 1000);
+      assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(4).getTime() < 1000);
+      assertFalse(iter.hasNext());
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..4ed1f86
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for logical functions.
+ */
+public class BeamSqlLogicalFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test
+  public void testStringFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_integer = 1 AND c_bigint = 1", true)
+        .addExpr("c_integer = 1 OR c_bigint = 2", true)
+        .addExpr("NOT c_bigint = 2", true)
+        .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true)
+        .addExpr("c_integer = 2 AND c_bigint = 1", false)
+        .addExpr("c_integer = 2 OR c_bigint = 2", false)
+        .addExpr("NOT c_bigint = 1", false)
+        .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
new file mode 100644
index 0000000..9f7d917
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.util.Random;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.junit.Test;
+
+/**
+ * Integration test for built-in MATH functions.
+ */
+public class BeamSqlMathFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  private static final int INTEGER_VALUE = 1;
+  private static final long LONG_VALUE = 1L;
+  private static final short SHORT_VALUE = 1;
+  private static final byte BYTE_VALUE = 1;
+  private static final double DOUBLE_VALUE = 1.0;
+  private static final float FLOAT_VALUE = 1.0f;
+  private static final BigDecimal DECIMAL_VALUE = new BigDecimal(1);
+
+  @Test
+  public void testAbs() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ABS(c_integer)", Math.abs(INTEGER_VALUE))
+        .addExpr("ABS(c_bigint)", Math.abs(LONG_VALUE))
+        .addExpr("ABS(c_smallint)", (short) Math.abs(SHORT_VALUE))
+        .addExpr("ABS(c_tinyint)", (byte) Math.abs(BYTE_VALUE))
+        .addExpr("ABS(c_double)", Math.abs(DOUBLE_VALUE))
+        .addExpr("ABS(c_float)", Math.abs(FLOAT_VALUE))
+        .addExpr("ABS(c_decimal)", new BigDecimal(Math.abs(DECIMAL_VALUE.doubleValue())))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSqrt() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SQRT(c_integer)", Math.sqrt(INTEGER_VALUE))
+        .addExpr("SQRT(c_bigint)", Math.sqrt(LONG_VALUE))
+        .addExpr("SQRT(c_smallint)", Math.sqrt(SHORT_VALUE))
+        .addExpr("SQRT(c_tinyint)", Math.sqrt(BYTE_VALUE))
+        .addExpr("SQRT(c_double)", Math.sqrt(DOUBLE_VALUE))
+        .addExpr("SQRT(c_float)", Math.sqrt(FLOAT_VALUE))
+        .addExpr("SQRT(c_decimal)", Math.sqrt(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRound() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ROUND(c_integer, 0)", SqlFunctions.sround(INTEGER_VALUE, 0))
+        .addExpr("ROUND(c_bigint, 0)", SqlFunctions.sround(LONG_VALUE, 0))
+        .addExpr("ROUND(c_smallint, 0)", (short) SqlFunctions.sround(SHORT_VALUE, 0))
+        .addExpr("ROUND(c_tinyint, 0)", (byte) SqlFunctions.sround(BYTE_VALUE, 0))
+        .addExpr("ROUND(c_double, 0)", SqlFunctions.sround(DOUBLE_VALUE, 0))
+        .addExpr("ROUND(c_float, 0)", (float) SqlFunctions.sround(FLOAT_VALUE, 0))
+        .addExpr("ROUND(c_decimal, 0)",
+            new BigDecimal(SqlFunctions.sround(DECIMAL_VALUE.doubleValue(), 0)))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLn() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("LN(c_integer)", Math.log(INTEGER_VALUE))
+        .addExpr("LN(c_bigint)", Math.log(LONG_VALUE))
+        .addExpr("LN(c_smallint)", Math.log(SHORT_VALUE))
+        .addExpr("LN(c_tinyint)", Math.log(BYTE_VALUE))
+        .addExpr("LN(c_double)", Math.log(DOUBLE_VALUE))
+        .addExpr("LN(c_float)", Math.log(FLOAT_VALUE))
+        .addExpr("LN(c_decimal)", Math.log(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLog10() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("LOG10(c_integer)", Math.log10(INTEGER_VALUE))
+        .addExpr("LOG10(c_bigint)", Math.log10(LONG_VALUE))
+        .addExpr("LOG10(c_smallint)", Math.log10(SHORT_VALUE))
+        .addExpr("LOG10(c_tinyint)", Math.log10(BYTE_VALUE))
+        .addExpr("LOG10(c_double)", Math.log10(DOUBLE_VALUE))
+        .addExpr("LOG10(c_float)", Math.log10(FLOAT_VALUE))
+        .addExpr("LOG10(c_decimal)", Math.log10(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testExp() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("EXP(c_integer)", Math.exp(INTEGER_VALUE))
+        .addExpr("EXP(c_bigint)", Math.exp(LONG_VALUE))
+        .addExpr("EXP(c_smallint)", Math.exp(SHORT_VALUE))
+        .addExpr("EXP(c_tinyint)", Math.exp(BYTE_VALUE))
+        .addExpr("EXP(c_double)", Math.exp(DOUBLE_VALUE))
+        .addExpr("EXP(c_float)", Math.exp(FLOAT_VALUE))
+        .addExpr("EXP(c_decimal)", Math.exp(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAcos() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ACOS(c_integer)", Math.acos(INTEGER_VALUE))
+        .addExpr("ACOS(c_bigint)", Math.acos(LONG_VALUE))
+        .addExpr("ACOS(c_smallint)", Math.acos(SHORT_VALUE))
+        .addExpr("ACOS(c_tinyint)", Math.acos(BYTE_VALUE))
+        .addExpr("ACOS(c_double)", Math.acos(DOUBLE_VALUE))
+        .addExpr("ACOS(c_float)", Math.acos(FLOAT_VALUE))
+        .addExpr("ACOS(c_decimal)", Math.acos(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAsin() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ASIN(c_integer)", Math.asin(INTEGER_VALUE))
+        .addExpr("ASIN(c_bigint)", Math.asin(LONG_VALUE))
+        .addExpr("ASIN(c_smallint)", Math.asin(SHORT_VALUE))
+        .addExpr("ASIN(c_tinyint)", Math.asin(BYTE_VALUE))
+        .addExpr("ASIN(c_double)", Math.asin(DOUBLE_VALUE))
+        .addExpr("ASIN(c_float)", Math.asin(FLOAT_VALUE))
+        .addExpr("ASIN(c_decimal)", Math.asin(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAtan() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ATAN(c_integer)", Math.atan(INTEGER_VALUE))
+        .addExpr("ATAN(c_bigint)", Math.atan(LONG_VALUE))
+        .addExpr("ATAN(c_smallint)", Math.atan(SHORT_VALUE))
+        .addExpr("ATAN(c_tinyint)", Math.atan(BYTE_VALUE))
+        .addExpr("ATAN(c_double)", Math.atan(DOUBLE_VALUE))
+        .addExpr("ATAN(c_float)", Math.atan(FLOAT_VALUE))
+        .addExpr("ATAN(c_decimal)", Math.atan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testCot() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("COT(c_integer)", 1.0d / Math.tan(INTEGER_VALUE))
+        .addExpr("COT(c_bigint)", 1.0d / Math.tan(LONG_VALUE))
+        .addExpr("COT(c_smallint)", 1.0d / Math.tan(SHORT_VALUE))
+        .addExpr("COT(c_tinyint)", 1.0d / Math.tan(BYTE_VALUE))
+        .addExpr("COT(c_double)", 1.0d / Math.tan(DOUBLE_VALUE))
+        .addExpr("COT(c_float)", 1.0d / Math.tan(FLOAT_VALUE))
+        .addExpr("COT(c_decimal)", 1.0d / Math.tan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testDegrees() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("DEGREES(c_integer)", Math.toDegrees(INTEGER_VALUE))
+        .addExpr("DEGREES(c_bigint)", Math.toDegrees(LONG_VALUE))
+        .addExpr("DEGREES(c_smallint)", Math.toDegrees(SHORT_VALUE))
+        .addExpr("DEGREES(c_tinyint)", Math.toDegrees(BYTE_VALUE))
+        .addExpr("DEGREES(c_double)", Math.toDegrees(DOUBLE_VALUE))
+        .addExpr("DEGREES(c_float)", Math.toDegrees(FLOAT_VALUE))
+        .addExpr("DEGREES(c_decimal)", Math.toDegrees(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRadians() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RADIANS(c_integer)", Math.toRadians(INTEGER_VALUE))
+        .addExpr("RADIANS(c_bigint)", Math.toRadians(LONG_VALUE))
+        .addExpr("RADIANS(c_smallint)", Math.toRadians(SHORT_VALUE))
+        .addExpr("RADIANS(c_tinyint)", Math.toRadians(BYTE_VALUE))
+        .addExpr("RADIANS(c_double)", Math.toRadians(DOUBLE_VALUE))
+        .addExpr("RADIANS(c_float)", Math.toRadians(FLOAT_VALUE))
+        .addExpr("RADIANS(c_decimal)", Math.toRadians(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testCos() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("COS(c_integer)", Math.cos(INTEGER_VALUE))
+        .addExpr("COS(c_bigint)", Math.cos(LONG_VALUE))
+        .addExpr("COS(c_smallint)", Math.cos(SHORT_VALUE))
+        .addExpr("COS(c_tinyint)", Math.cos(BYTE_VALUE))
+        .addExpr("COS(c_double)", Math.cos(DOUBLE_VALUE))
+        .addExpr("COS(c_float)", Math.cos(FLOAT_VALUE))
+        .addExpr("COS(c_decimal)", Math.cos(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSin() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SIN(c_integer)", Math.sin(INTEGER_VALUE))
+        .addExpr("SIN(c_bigint)", Math.sin(LONG_VALUE))
+        .addExpr("SIN(c_smallint)", Math.sin(SHORT_VALUE))
+        .addExpr("SIN(c_tinyint)", Math.sin(BYTE_VALUE))
+        .addExpr("SIN(c_double)", Math.sin(DOUBLE_VALUE))
+        .addExpr("SIN(c_float)", Math.sin(FLOAT_VALUE))
+        .addExpr("SIN(c_decimal)", Math.sin(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testTan() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TAN(c_integer)", Math.tan(INTEGER_VALUE))
+        .addExpr("TAN(c_bigint)", Math.tan(LONG_VALUE))
+        .addExpr("TAN(c_smallint)", Math.tan(SHORT_VALUE))
+        .addExpr("TAN(c_tinyint)", Math.tan(BYTE_VALUE))
+        .addExpr("TAN(c_double)", Math.tan(DOUBLE_VALUE))
+        .addExpr("TAN(c_float)", Math.tan(FLOAT_VALUE))
+        .addExpr("TAN(c_decimal)", Math.tan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSign() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SIGN(c_integer)", Integer.signum(INTEGER_VALUE))
+        .addExpr("SIGN(c_bigint)", (long) (Long.signum(LONG_VALUE)))
+        .addExpr("SIGN(c_smallint)", (short) (Integer.signum(SHORT_VALUE)))
+        .addExpr("SIGN(c_tinyint)", (byte) Integer.signum(BYTE_VALUE))
+        .addExpr("SIGN(c_double)", Math.signum(DOUBLE_VALUE))
+        .addExpr("SIGN(c_float)", Math.signum(FLOAT_VALUE))
+        .addExpr("SIGN(c_decimal)", BigDecimal.valueOf(DECIMAL_VALUE.signum()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPower() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("POWER(c_integer, 2)", Math.pow(INTEGER_VALUE, 2))
+        .addExpr("POWER(c_bigint, 2)", Math.pow(LONG_VALUE, 2))
+        .addExpr("POWER(c_smallint, 2)", Math.pow(SHORT_VALUE, 2))
+        .addExpr("POWER(c_tinyint, 2)", Math.pow(BYTE_VALUE, 2))
+        .addExpr("POWER(c_double, 2)", Math.pow(DOUBLE_VALUE, 2))
+        .addExpr("POWER(c_float, 2)", Math.pow(FLOAT_VALUE, 2))
+        .addExpr("POWER(c_decimal, 2)", Math.pow(DECIMAL_VALUE.doubleValue(), 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPi() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("PI", Math.PI)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAtan2() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ATAN2(c_integer, 2)", Math.atan2(INTEGER_VALUE, 2))
+        .addExpr("ATAN2(c_bigint, 2)", Math.atan2(LONG_VALUE, 2))
+        .addExpr("ATAN2(c_smallint, 2)", Math.atan2(SHORT_VALUE, 2))
+        .addExpr("ATAN2(c_tinyint, 2)", Math.atan2(BYTE_VALUE, 2))
+        .addExpr("ATAN2(c_double, 2)", Math.atan2(DOUBLE_VALUE, 2))
+        .addExpr("ATAN2(c_float, 2)", Math.atan2(FLOAT_VALUE, 2))
+        .addExpr("ATAN2(c_decimal, 2)", Math.atan2(DECIMAL_VALUE.doubleValue(), 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testTruncate() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TRUNCATE(c_integer, 2)", SqlFunctions.struncate(INTEGER_VALUE, 2))
+        .addExpr("TRUNCATE(c_bigint, 2)", SqlFunctions.struncate(LONG_VALUE, 2))
+        .addExpr("TRUNCATE(c_smallint, 2)", (short) SqlFunctions.struncate(SHORT_VALUE, 2))
+        .addExpr("TRUNCATE(c_tinyint, 2)", (byte) SqlFunctions.struncate(BYTE_VALUE, 2))
+        .addExpr("TRUNCATE(c_double, 2)", SqlFunctions.struncate(DOUBLE_VALUE, 2))
+        .addExpr("TRUNCATE(c_float, 2)", (float) SqlFunctions.struncate(FLOAT_VALUE, 2))
+        .addExpr("TRUNCATE(c_decimal, 2)", SqlFunctions.struncate(DECIMAL_VALUE, 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRand() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RAND(c_integer)", new Random(INTEGER_VALUE).nextDouble())
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRandInteger() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RAND_INTEGER(c_integer, c_integer)",
+            new Random(INTEGER_VALUE).nextInt(INTEGER_VALUE))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
new file mode 100644
index 0000000..e28581f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for string functions.
+ */
+public class BeamSqlStringFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test
+  public void testStringFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("'hello' || ' world'", "hello world")
+        .addExpr("CHAR_LENGTH('hello')", 5)
+        .addExpr("CHARACTER_LENGTH('hello')", 5)
+        .addExpr("UPPER('hello')", "HELLO")
+        .addExpr("LOWER('HELLO')", "hello")
+
+        .addExpr("POSITION('world' IN 'helloworld')", 5)
+        .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+        .addExpr("TRIM(' hello ')", "hello")
+        .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
+        .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
+
+        .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello")
+        .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce")
+        .addExpr("SUBSTRING('hello' FROM 2)", "ello")
+        .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el")
+        .addExpr("INITCAP('hello world')", "Hello World")
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
new file mode 100644
index 0000000..15d5a52
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSqlFnExecutor}.
+ */
+public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testBeamFilterRel() {
+    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null,
+        condition);
+
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
+    executor.prepare();
+
+    Assert.assertEquals(1, executor.exps.size());
+
+    BeamSqlExpression l1Exp = executor.exps.get(0);
+    assertTrue(l1Exp instanceof BeamSqlAndExpression);
+    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+    Assert.assertEquals(2, l1Exp.getOperands().size());
+    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+    assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
+    assertTrue(l1Right instanceof BeamSqlEqualsExpression);
+
+    Assert.assertEquals(2, l1Left.getOperands().size());
+    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);
+    BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1);
+    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+    Assert.assertEquals(2, l1Right.getOperands().size());
+    BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0);
+    BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1);
+    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+  }
+
+  @Test
+  public void testBeamProjectRel() {
+    BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(),
+        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+        rexBuilder.identityProjects(relDataType), relDataType);
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
+
+    executor.prepare();
+    Assert.assertEquals(4, executor.exps.size());
+    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
+  }
+
+
+  @Test
+  public void testBuildExpression_logical() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlAndExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOrExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlNotExpression);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_andOr_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperandCount() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test
+  public void testBuildExpression_arithmetic() {
+    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class);
+  }
+
+  private void testBuildArithmeticExpression(SqlOperator fn,
+      Class<? extends BeamSqlExpression> clazz) {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+    ));
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+
+    assertTrue(exp.getClass().equals(clazz));
+  }
+
+  @Test
+  public void testBuildExpression_string()  {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello "),
+            rexBuilder.makeLiteral("world")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlConcatExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello"),
+            rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+                rexBuilder.makeBigintLiteral(BigDecimal.ONE))
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlUpperExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+        Arrays.asList(
+            rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlTrimExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCaseExpression);
+  }
+
+  @Test
+  public void testBuildExpression_date() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
+    calendar.setTime(new Date());
+
+    // CEIL
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateCeilExpression);
+
+    // FLOOR
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateFloorExpression);
+
+    // EXTRACT == EXTRACT_DATE?
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
+        Arrays.asList(
+            rexBuilder.makeFlag(TimeUnitRange.MONTH),
+            rexBuilder.makeDateLiteral(calendar)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlExtractExpression);
+
+    // CURRENT_DATE
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentDateExpression);
+
+    // LOCALTIME
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
+
+    // LOCALTIMESTAMP
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
new file mode 100644
index 0000000..d7b54c7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
+import org.apache.beam.dsls.sql.planner.BeamRuleSets;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}.
+ */
+public class BeamSqlFnExecutorTestBase {
+  public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+  public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder);
+
+  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  public static RelDataType relDataType;
+
+  public static BeamSqlRowType beamRowType;
+  public static BeamSqlRow record;
+
+  public static RelBuilder relBuilder;
+
+  @BeforeClass
+  public static void prepare() {
+    relDataType = TYPE_FACTORY.builder()
+        .add("order_id", SqlTypeName.BIGINT)
+        .add("site_id", SqlTypeName.INTEGER)
+        .add("price", SqlTypeName.DOUBLE)
+        .add("order_time", SqlTypeName.BIGINT).build();
+
+    beamRowType = CalciteUtils.toBeamRowType(relDataType);
+    record = new BeamSqlRow(beamRowType);
+
+    record.addField(0, 1234567L);
+    record.addField(1, 0);
+    record.addField(2, 8.9);
+    record.addField(3, 1234567L);
+
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+    FrameworkConfig config = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+        .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+    relBuilder = RelBuilder.create(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
new file mode 100644
index 0000000..8ff105e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlIsNullExpression} and
+ * {@link BeamSqlIsNotNullExpression}.
+ */
+public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testIsNull() {
+    BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testIsNotNull() {
+    BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
new file mode 100644
index 0000000..01c57a8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
+ */
+public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testAnd() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testOr() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
new file mode 100644
index 0000000..39eec76
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCaseExpression.
+ */
+public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertTrue(new BeamSqlCaseExpression(operands).accept());
+
+    // even param count
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+    // `when` type error
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+    // `then` type mixing
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("hello", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("world", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("hello1", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..c2fd68d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  private List<BeamSqlExpression> operands;
+
+  @Before
+  public void setup() {
+    operands = new ArrayList<>();
+  }
+
+  @Test
+  public void testForOperands() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+    Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+  }
+
+  @Test
+  public void testForIntegerToBigintTypeCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForDoubleToBigIntCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForIntegerToDateCast() {
+    // test for yyyyMMdd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyyyMMddDateFormat() {
+    //test for yyyy-MM-dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyMMddDateFormat() {
+    // test for yy.MM.dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testForTimestampCastExpression() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+    Assert.assertEquals(SqlTypeName.TIMESTAMP,
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+            .getOutputType());
+  }
+
+  @Test
+  public void testDateTimeFormatWithMillis() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormatWithTimezone() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormat() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testForCastTypeNotSupported() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
new file mode 100644
index 0000000..50f1b78
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlCompareExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for the collections of {@link BeamSqlCompareExpression}.
+ */
+public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testEqual() {
+    BeamSqlEqualsExpression exp1 = new BeamSqlEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlEqualsExpression exp2 = new BeamSqlEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLargerThan(){
+    BeamSqlGreaterThanExpression exp1 = new BeamSqlGreaterThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlGreaterThanExpression exp2 = new BeamSqlGreaterThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLargerThanEqual(){
+    BeamSqlGreaterThanOrEqualsExpression exp1 = new BeamSqlGreaterThanOrEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlGreaterThanOrEqualsExpression exp2 = new BeamSqlGreaterThanOrEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLessThan(){
+    BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLessThanEqual(){
+    BeamSqlLessThanOrEqualsExpression exp1 = new BeamSqlLessThanOrEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+            BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlLessThanOrEqualsExpression exp2 = new BeamSqlLessThanOrEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+            BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testNotEqual(){
+    BeamSqlNotEqualsExpression exp1 = new BeamSqlNotEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlNotEqualsExpression exp2 = new BeamSqlNotEqualsExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
new file mode 100644
index 0000000..76e7a5a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlInputRefExpression}.
+ */
+public class BeamSqlInputRefExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testRefInRange() {
+    BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
+    Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref1 = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1);
+    Assert.assertEquals(record.getInteger(1), ref1.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref2 = new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2);
+    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref3 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3);
+    Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue());
+  }
+
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testRefOutOfRange(){
+    BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4);
+    ref.evaluate(record).getValue();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testTypeUnMatch(){
+    BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0);
+    ref.evaluate(record).getValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
new file mode 100644
index 0000000..eb51b6b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlPrimitive}.
+ *
+ */
+public class BeamSqlPrimitiveTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testPrimitiveInt(){
+    BeamSqlPrimitive<Integer> expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPrimitiveTypeUnMatch1(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = IllegalArgumentException.class)
+  public void testPrimitiveTypeUnMatch2(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = IllegalArgumentException.class)
+  public void testPrimitiveTypeUnMatch3(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = IllegalArgumentException.class)
+  public void testPrimitiveTypeUnMatch4(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
new file mode 100644
index 0000000..897a351
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlReinterpretExpression}.
+ */
+public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, new Date()));
+    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date()));
+    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+    operands.clear();
+    GregorianCalendar calendar = new GregorianCalendar();
+    calendar.setTime(new Date());
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+    // currently only support reinterpret DATE
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+    // currently only support convert to BIGINT
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+    assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.TINYINT).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    Date d = new Date();
+    d.setTime(1000);
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
+    assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
+        .evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
new file mode 100644
index 0000000..e1660b4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlUdfExpression.
+ */
+public class BeamSqlUdfExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testUdf() throws NoSuchMethodException, SecurityException {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+
+    BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
+        UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
+
+    Assert.assertEquals(-10, exp.evaluate(record).getValue());
+  }
+
+  /**
+   * UDF example.
+   */
+  public static final class UdfFn {
+    public static int negative(Integer number) {
+      return number == null ? 0 : 0 - number;
+    }
+  }
+}