You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:48 UTC

[25/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
new file mode 100644
index 0000000..ffc6833
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import com.google.common.base.Joiner;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.util.Pair;
+import org.junit.Rule;
+
+/**
+ * Base class for all built-in functions integration tests.
+ */
+public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+  private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
+  static {
+    JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
+    JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
+    JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
+    JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
+    JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
+    JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
+    JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
+  }
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  protected PCollection<BeamSqlRow> getTestPCollection() {
+    BeamSqlRowType type = BeamSqlRowType.create(
+        Arrays.asList("ts", "c_tinyint", "c_smallint",
+            "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
+            "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
+        Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
+            Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
+            Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
+    );
+    try {
+      return MockedBoundedTable
+          .of(type)
+          .addRows(
+              parseDate("1986-02-15 11:35:26"),
+              (byte) 1,
+              (short) 1,
+              1,
+              1L,
+              1.0f,
+              1.0,
+              BigDecimal.ONE,
+              (byte) 127,
+              (short) 32767,
+              2147483647,
+              9223372036854775807L
+          )
+          .buildIOReader(pipeline)
+          .setCoder(new BeamSqlRowCoder(type));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static Date parseDate(String str) {
+    try {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+      return sdf.parse(str);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * Helper class to make write integration test for built-in functions easier.
+   *
+   * <p>example usage:
+   * <pre>{@code
+   * ExpressionChecker checker = new ExpressionChecker()
+   *   .addExpr("1 + 1", 2)
+   *   .addExpr("1.0 + 1", 2.0)
+   *   .addExpr("1 + 1.0", 2.0)
+   *   .addExpr("1.0 + 1.0", 2.0)
+   *   .addExpr("c_tinyint + c_tinyint", (byte) 2);
+   * checker.buildRunAndCheck(inputCollections);
+   * }</pre>
+   */
+  public class ExpressionChecker {
+    private transient List<Pair<String, Object>> exps = new ArrayList<>();
+
+    public ExpressionChecker addExpr(String expression, Object expectedValue) {
+      exps.add(Pair.of(expression, expectedValue));
+      return this;
+    }
+
+    private String getSql() {
+      List<String> expStrs = new ArrayList<>();
+      for (Pair<String, Object> pair : exps) {
+        expStrs.add(pair.getKey());
+      }
+      return "SELECT " + Joiner.on(",\n  ").join(expStrs) + " FROM PCOLLECTION";
+    }
+
+    /**
+     * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
+     */
+    public void buildRunAndCheck() {
+      PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+      System.out.println("SQL:>\n" + getSql());
+      try {
+        List<String> names = new ArrayList<>();
+        List<Integer> types = new ArrayList<>();
+        List<Object> values = new ArrayList<>();
+
+        for (Pair<String, Object> pair : exps) {
+          names.add(pair.getKey());
+          types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
+          values.add(pair.getValue());
+        }
+
+        PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+        PAssert.that(rows).containsInAnyOrder(
+            TestUtils.RowsBuilder
+                .of(BeamSqlRowType.create(names, types))
+                .addRows(values)
+                .getRows()
+        );
+        inputCollection.getPipeline().run();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
new file mode 100644
index 0000000..14de5b6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for comparison operators.
+ */
+public class BeamSqlComparisonOperatorsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+  @Test
+  public void testEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_1 = c_tinyint_1", true)
+        .addExpr("c_tinyint_1 = c_tinyint_2", false)
+        .addExpr("c_smallint_1 = c_smallint_1", true)
+        .addExpr("c_smallint_1 = c_smallint_2", false)
+        .addExpr("c_integer_1 = c_integer_1", true)
+        .addExpr("c_integer_1 = c_integer_2", false)
+        .addExpr("c_bigint_1 = c_bigint_1", true)
+        .addExpr("c_bigint_1 = c_bigint_2", false)
+        .addExpr("c_float_1 = c_float_1", true)
+        .addExpr("c_float_1 = c_float_2", false)
+        .addExpr("c_double_1 = c_double_1", true)
+        .addExpr("c_double_1 = c_double_2", false)
+        .addExpr("c_decimal_1 = c_decimal_1", true)
+        .addExpr("c_decimal_1 = c_decimal_2", false)
+        .addExpr("c_varchar_1 = c_varchar_1", true)
+        .addExpr("c_varchar_1 = c_varchar_2", false)
+        .addExpr("c_boolean_true = c_boolean_true", true)
+        .addExpr("c_boolean_true = c_boolean_false", false)
+
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testNotEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_1 <> c_tinyint_1", false)
+        .addExpr("c_tinyint_1 <> c_tinyint_2", true)
+        .addExpr("c_smallint_1 <> c_smallint_1", false)
+        .addExpr("c_smallint_1 <> c_smallint_2", true)
+        .addExpr("c_integer_1 <> c_integer_1", false)
+        .addExpr("c_integer_1 <> c_integer_2", true)
+        .addExpr("c_bigint_1 <> c_bigint_1", false)
+        .addExpr("c_bigint_1 <> c_bigint_2", true)
+        .addExpr("c_float_1 <> c_float_1", false)
+        .addExpr("c_float_1 <> c_float_2", true)
+        .addExpr("c_double_1 <> c_double_1", false)
+        .addExpr("c_double_1 <> c_double_2", true)
+        .addExpr("c_decimal_1 <> c_decimal_1", false)
+        .addExpr("c_decimal_1 <> c_decimal_2", true)
+        .addExpr("c_varchar_1 <> c_varchar_1", false)
+        .addExpr("c_varchar_1 <> c_varchar_2", true)
+        .addExpr("c_boolean_true <> c_boolean_true", false)
+        .addExpr("c_boolean_true <> c_boolean_false", true)
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testGreaterThan() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 > c_tinyint_1", true)
+        .addExpr("c_tinyint_1 > c_tinyint_1", false)
+        .addExpr("c_tinyint_1 > c_tinyint_2", false)
+
+        .addExpr("c_smallint_2 > c_smallint_1", true)
+        .addExpr("c_smallint_1 > c_smallint_1", false)
+        .addExpr("c_smallint_1 > c_smallint_2", false)
+
+        .addExpr("c_integer_2 > c_integer_1", true)
+        .addExpr("c_integer_1 > c_integer_1", false)
+        .addExpr("c_integer_1 > c_integer_2", false)
+
+        .addExpr("c_bigint_2 > c_bigint_1", true)
+        .addExpr("c_bigint_1 > c_bigint_1", false)
+        .addExpr("c_bigint_1 > c_bigint_2", false)
+
+        .addExpr("c_float_2 > c_float_1", true)
+        .addExpr("c_float_1 > c_float_1", false)
+        .addExpr("c_float_1 > c_float_2", false)
+
+        .addExpr("c_double_2 > c_double_1", true)
+        .addExpr("c_double_1 > c_double_1", false)
+        .addExpr("c_double_1 > c_double_2", false)
+
+        .addExpr("c_decimal_2 > c_decimal_1", true)
+        .addExpr("c_decimal_1 > c_decimal_1", false)
+        .addExpr("c_decimal_1 > c_decimal_2", false)
+
+        .addExpr("c_varchar_2 > c_varchar_1", true)
+        .addExpr("c_varchar_1 > c_varchar_1", false)
+        .addExpr("c_varchar_1 > c_varchar_2", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testGreaterThanException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false > c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testGreaterThanOrEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 >= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 >= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 >= c_tinyint_2", false)
+
+        .addExpr("c_smallint_2 >= c_smallint_1", true)
+        .addExpr("c_smallint_1 >= c_smallint_1", true)
+        .addExpr("c_smallint_1 >= c_smallint_2", false)
+
+        .addExpr("c_integer_2 >= c_integer_1", true)
+        .addExpr("c_integer_1 >= c_integer_1", true)
+        .addExpr("c_integer_1 >= c_integer_2", false)
+
+        .addExpr("c_bigint_2 >= c_bigint_1", true)
+        .addExpr("c_bigint_1 >= c_bigint_1", true)
+        .addExpr("c_bigint_1 >= c_bigint_2", false)
+
+        .addExpr("c_float_2 >= c_float_1", true)
+        .addExpr("c_float_1 >= c_float_1", true)
+        .addExpr("c_float_1 >= c_float_2", false)
+
+        .addExpr("c_double_2 >= c_double_1", true)
+        .addExpr("c_double_1 >= c_double_1", true)
+        .addExpr("c_double_1 >= c_double_2", false)
+
+        .addExpr("c_decimal_2 >= c_decimal_1", true)
+        .addExpr("c_decimal_1 >= c_decimal_1", true)
+        .addExpr("c_decimal_1 >= c_decimal_2", false)
+
+        .addExpr("c_varchar_2 >= c_varchar_1", true)
+        .addExpr("c_varchar_1 >= c_varchar_1", true)
+        .addExpr("c_varchar_1 >= c_varchar_2", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testGreaterThanOrEqualsException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false >= c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLessThan() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 < c_tinyint_1", false)
+        .addExpr("c_tinyint_1 < c_tinyint_1", false)
+        .addExpr("c_tinyint_1 < c_tinyint_2", true)
+
+        .addExpr("c_smallint_2 < c_smallint_1", false)
+        .addExpr("c_smallint_1 < c_smallint_1", false)
+        .addExpr("c_smallint_1 < c_smallint_2", true)
+
+        .addExpr("c_integer_2 < c_integer_1", false)
+        .addExpr("c_integer_1 < c_integer_1", false)
+        .addExpr("c_integer_1 < c_integer_2", true)
+
+        .addExpr("c_bigint_2 < c_bigint_1", false)
+        .addExpr("c_bigint_1 < c_bigint_1", false)
+        .addExpr("c_bigint_1 < c_bigint_2", true)
+
+        .addExpr("c_float_2 < c_float_1", false)
+        .addExpr("c_float_1 < c_float_1", false)
+        .addExpr("c_float_1 < c_float_2", true)
+
+        .addExpr("c_double_2 < c_double_1", false)
+        .addExpr("c_double_1 < c_double_1", false)
+        .addExpr("c_double_1 < c_double_2", true)
+
+        .addExpr("c_decimal_2 < c_decimal_1", false)
+        .addExpr("c_decimal_1 < c_decimal_1", false)
+        .addExpr("c_decimal_1 < c_decimal_2", true)
+
+        .addExpr("c_varchar_2 < c_varchar_1", false)
+        .addExpr("c_varchar_1 < c_varchar_1", false)
+        .addExpr("c_varchar_1 < c_varchar_2", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testLessThanException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false < c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLessThanOrEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 <= c_tinyint_1", false)
+        .addExpr("c_tinyint_1 <= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 <= c_tinyint_2", true)
+
+        .addExpr("c_smallint_2 <= c_smallint_1", false)
+        .addExpr("c_smallint_1 <= c_smallint_1", true)
+        .addExpr("c_smallint_1 <= c_smallint_2", true)
+
+        .addExpr("c_integer_2 <= c_integer_1", false)
+        .addExpr("c_integer_1 <= c_integer_1", true)
+        .addExpr("c_integer_1 <= c_integer_2", true)
+
+        .addExpr("c_bigint_2 <= c_bigint_1", false)
+        .addExpr("c_bigint_1 <= c_bigint_1", true)
+        .addExpr("c_bigint_1 <= c_bigint_2", true)
+
+        .addExpr("c_float_2 <= c_float_1", false)
+        .addExpr("c_float_1 <= c_float_1", true)
+        .addExpr("c_float_1 <= c_float_2", true)
+
+        .addExpr("c_double_2 <= c_double_1", false)
+        .addExpr("c_double_1 <= c_double_1", true)
+        .addExpr("c_double_1 <= c_double_2", true)
+
+        .addExpr("c_decimal_2 <= c_decimal_1", false)
+        .addExpr("c_decimal_1 <= c_decimal_1", true)
+        .addExpr("c_decimal_1 <= c_decimal_2", true)
+
+        .addExpr("c_varchar_2 <= c_varchar_1", false)
+        .addExpr("c_varchar_1 <= c_varchar_1", true)
+        .addExpr("c_varchar_1 <= c_varchar_2", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testLessThanOrEqualsException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false <= c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testIsNullAndIsNotNull() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 IS NOT NULL", true)
+        .addExpr("NULL IS NOT NULL", false)
+
+        .addExpr("1 IS NULL", false)
+        .addExpr("NULL IS NULL", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Override protected PCollection<BeamSqlRow> getTestPCollection() {
+    BeamSqlRowType type = BeamSqlRowType.create(
+        Arrays.asList(
+            "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
+            "c_smallint_0", "c_smallint_1", "c_smallint_2",
+            "c_integer_0", "c_integer_1", "c_integer_2",
+            "c_bigint_0", "c_bigint_1", "c_bigint_2",
+            "c_float_0", "c_float_1", "c_float_2",
+            "c_double_0", "c_double_1", "c_double_2",
+            "c_decimal_0", "c_decimal_1", "c_decimal_2",
+            "c_varchar_0", "c_varchar_1", "c_varchar_2",
+            "c_boolean_false", "c_boolean_true"
+            ),
+        Arrays.asList(
+            Types.TINYINT, Types.TINYINT, Types.TINYINT,
+            Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+            Types.INTEGER, Types.INTEGER, Types.INTEGER,
+            Types.BIGINT, Types.BIGINT, Types.BIGINT,
+            Types.FLOAT, Types.FLOAT, Types.FLOAT,
+            Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+            Types.DECIMAL, Types.DECIMAL, Types.DECIMAL,
+            Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
+            Types.BOOLEAN, Types.BOOLEAN
+        )
+    );
+    try {
+      return MockedBoundedTable
+          .of(type)
+          .addRows(
+              (byte) 0, (byte) 1, (byte) 2,
+              (short) 0, (short) 1, (short) 2,
+              0, 1, 2,
+              0L, 1L, 2L,
+              0.0f, 1.0f, 2.0f,
+              0.0, 1.0, 2.0,
+              BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE),
+              "a", "b", "c",
+              false, true
+          )
+          .buildIOReader(pipeline)
+          .setCoder(new BeamSqlRowCoder(type));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..f4416ce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for conditional functions.
+ */
+public class BeamSqlConditionalFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+    @Test
+    public void testConditionalFunctions() throws Exception {
+      ExpressionChecker checker = new ExpressionChecker()
+          .addExpr(
+              "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END",
+              "hello"
+          )
+          .addExpr(
+              "CASE 2 "
+                  + "WHEN 1 THEN 'hello' "
+                  + "WHEN 3 THEN 'bond' "
+                  + "ELSE 'world' END",
+              "world"
+          )
+          .addExpr(
+              "CASE "
+                  + "WHEN 1 = 1 THEN 'hello' "
+                  + "ELSE 'world' END",
+              "hello"
+          )
+          .addExpr(
+              "CASE "
+                  + "WHEN 1 > 1 THEN 'hello' "
+                  + "ELSE 'world' END",
+              "world"
+          )
+          .addExpr("NULLIF(5, 4) ", 5)
+          .addExpr("COALESCE(1, 5) ", 1)
+          .addExpr("COALESCE(NULL, 5) ", 5)
+          ;
+
+      checker.buildRunAndCheck();
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
new file mode 100644
index 0000000..181c991
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for date functions.
+ */
+public class BeamSqlDateFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test public void testDateTimeFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
+        .addExpr("YEAR(ts)", 1986L)
+        .addExpr("QUARTER(ts)", 1L)
+        .addExpr("MONTH(ts)", 2L)
+        .addExpr("WEEK(ts)", 7L)
+        .addExpr("DAYOFMONTH(ts)", 15L)
+        .addExpr("DAYOFYEAR(ts)", 46L)
+        .addExpr("DAYOFWEEK(ts)", 7L)
+        .addExpr("HOUR(ts)", 11L)
+        .addExpr("MINUTE(ts)", 35L)
+        .addExpr("SECOND(ts)", 26L)
+        .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
+        .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"))
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test public void testDateTimeFunctions_currentTime() throws Exception {
+    String sql = "SELECT "
+        + "LOCALTIME as l,"
+        + "LOCALTIMESTAMP as l1,"
+        + "CURRENT_DATE as c1,"
+        + "CURRENT_TIME as c2,"
+        + "CURRENT_TIMESTAMP as c3"
+        + " FROM PCOLLECTION"
+        ;
+    PCollection<BeamSqlRow> rows = getTestPCollection().apply(
+        BeamSql.simpleQuery(sql));
+    PAssert.that(rows).satisfies(new Checker());
+    pipeline.run();
+  }
+
+  private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+    @Override public Void apply(Iterable<BeamSqlRow> input) {
+      Iterator<BeamSqlRow> iter = input.iterator();
+      assertTrue(iter.hasNext());
+      BeamSqlRow row = iter.next();
+        // LOCALTIME
+      Date date = new Date();
+      assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(1).getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(2).getTime() < 1000);
+      assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000);
+      assertTrue(date.getTime() - row.getDate(4).getTime() < 1000);
+      assertFalse(iter.hasNext());
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..b408d78
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for logical functions.
+ */
+public class BeamSqlLogicalFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test
+  public void testStringFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_integer = 1 AND c_bigint = 1", true)
+        .addExpr("c_integer = 1 OR c_bigint = 2", true)
+        .addExpr("NOT c_bigint = 2", true)
+        .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true)
+        .addExpr("c_integer = 2 AND c_bigint = 1", false)
+        .addExpr("c_integer = 2 OR c_bigint = 2", false)
+        .addExpr("NOT c_bigint = 1", false)
+        .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
new file mode 100644
index 0000000..995caaf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.util.Random;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.junit.Test;
+
+/**
+ * Integration test for built-in MATH functions.
+ */
+public class BeamSqlMathFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  private static final int INTEGER_VALUE = 1;
+  private static final long LONG_VALUE = 1L;
+  private static final short SHORT_VALUE = 1;
+  private static final byte BYTE_VALUE = 1;
+  private static final double DOUBLE_VALUE = 1.0;
+  private static final float FLOAT_VALUE = 1.0f;
+  private static final BigDecimal DECIMAL_VALUE = new BigDecimal(1);
+
+  @Test
+  public void testAbs() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ABS(c_integer)", Math.abs(INTEGER_VALUE))
+        .addExpr("ABS(c_bigint)", Math.abs(LONG_VALUE))
+        .addExpr("ABS(c_smallint)", (short) Math.abs(SHORT_VALUE))
+        .addExpr("ABS(c_tinyint)", (byte) Math.abs(BYTE_VALUE))
+        .addExpr("ABS(c_double)", Math.abs(DOUBLE_VALUE))
+        .addExpr("ABS(c_float)", Math.abs(FLOAT_VALUE))
+        .addExpr("ABS(c_decimal)", new BigDecimal(Math.abs(DECIMAL_VALUE.doubleValue())))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSqrt() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SQRT(c_integer)", Math.sqrt(INTEGER_VALUE))
+        .addExpr("SQRT(c_bigint)", Math.sqrt(LONG_VALUE))
+        .addExpr("SQRT(c_smallint)", Math.sqrt(SHORT_VALUE))
+        .addExpr("SQRT(c_tinyint)", Math.sqrt(BYTE_VALUE))
+        .addExpr("SQRT(c_double)", Math.sqrt(DOUBLE_VALUE))
+        .addExpr("SQRT(c_float)", Math.sqrt(FLOAT_VALUE))
+        .addExpr("SQRT(c_decimal)", Math.sqrt(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRound() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ROUND(c_integer, 0)", SqlFunctions.sround(INTEGER_VALUE, 0))
+        .addExpr("ROUND(c_bigint, 0)", SqlFunctions.sround(LONG_VALUE, 0))
+        .addExpr("ROUND(c_smallint, 0)", (short) SqlFunctions.sround(SHORT_VALUE, 0))
+        .addExpr("ROUND(c_tinyint, 0)", (byte) SqlFunctions.sround(BYTE_VALUE, 0))
+        .addExpr("ROUND(c_double, 0)", SqlFunctions.sround(DOUBLE_VALUE, 0))
+        .addExpr("ROUND(c_float, 0)", (float) SqlFunctions.sround(FLOAT_VALUE, 0))
+        .addExpr("ROUND(c_decimal, 0)",
+            new BigDecimal(SqlFunctions.sround(DECIMAL_VALUE.doubleValue(), 0)))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLn() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("LN(c_integer)", Math.log(INTEGER_VALUE))
+        .addExpr("LN(c_bigint)", Math.log(LONG_VALUE))
+        .addExpr("LN(c_smallint)", Math.log(SHORT_VALUE))
+        .addExpr("LN(c_tinyint)", Math.log(BYTE_VALUE))
+        .addExpr("LN(c_double)", Math.log(DOUBLE_VALUE))
+        .addExpr("LN(c_float)", Math.log(FLOAT_VALUE))
+        .addExpr("LN(c_decimal)", Math.log(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLog10() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("LOG10(c_integer)", Math.log10(INTEGER_VALUE))
+        .addExpr("LOG10(c_bigint)", Math.log10(LONG_VALUE))
+        .addExpr("LOG10(c_smallint)", Math.log10(SHORT_VALUE))
+        .addExpr("LOG10(c_tinyint)", Math.log10(BYTE_VALUE))
+        .addExpr("LOG10(c_double)", Math.log10(DOUBLE_VALUE))
+        .addExpr("LOG10(c_float)", Math.log10(FLOAT_VALUE))
+        .addExpr("LOG10(c_decimal)", Math.log10(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testExp() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("EXP(c_integer)", Math.exp(INTEGER_VALUE))
+        .addExpr("EXP(c_bigint)", Math.exp(LONG_VALUE))
+        .addExpr("EXP(c_smallint)", Math.exp(SHORT_VALUE))
+        .addExpr("EXP(c_tinyint)", Math.exp(BYTE_VALUE))
+        .addExpr("EXP(c_double)", Math.exp(DOUBLE_VALUE))
+        .addExpr("EXP(c_float)", Math.exp(FLOAT_VALUE))
+        .addExpr("EXP(c_decimal)", Math.exp(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAcos() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ACOS(c_integer)", Math.acos(INTEGER_VALUE))
+        .addExpr("ACOS(c_bigint)", Math.acos(LONG_VALUE))
+        .addExpr("ACOS(c_smallint)", Math.acos(SHORT_VALUE))
+        .addExpr("ACOS(c_tinyint)", Math.acos(BYTE_VALUE))
+        .addExpr("ACOS(c_double)", Math.acos(DOUBLE_VALUE))
+        .addExpr("ACOS(c_float)", Math.acos(FLOAT_VALUE))
+        .addExpr("ACOS(c_decimal)", Math.acos(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAsin() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ASIN(c_integer)", Math.asin(INTEGER_VALUE))
+        .addExpr("ASIN(c_bigint)", Math.asin(LONG_VALUE))
+        .addExpr("ASIN(c_smallint)", Math.asin(SHORT_VALUE))
+        .addExpr("ASIN(c_tinyint)", Math.asin(BYTE_VALUE))
+        .addExpr("ASIN(c_double)", Math.asin(DOUBLE_VALUE))
+        .addExpr("ASIN(c_float)", Math.asin(FLOAT_VALUE))
+        .addExpr("ASIN(c_decimal)", Math.asin(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAtan() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ATAN(c_integer)", Math.atan(INTEGER_VALUE))
+        .addExpr("ATAN(c_bigint)", Math.atan(LONG_VALUE))
+        .addExpr("ATAN(c_smallint)", Math.atan(SHORT_VALUE))
+        .addExpr("ATAN(c_tinyint)", Math.atan(BYTE_VALUE))
+        .addExpr("ATAN(c_double)", Math.atan(DOUBLE_VALUE))
+        .addExpr("ATAN(c_float)", Math.atan(FLOAT_VALUE))
+        .addExpr("ATAN(c_decimal)", Math.atan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testCot() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("COT(c_integer)", 1.0d / Math.tan(INTEGER_VALUE))
+        .addExpr("COT(c_bigint)", 1.0d / Math.tan(LONG_VALUE))
+        .addExpr("COT(c_smallint)", 1.0d / Math.tan(SHORT_VALUE))
+        .addExpr("COT(c_tinyint)", 1.0d / Math.tan(BYTE_VALUE))
+        .addExpr("COT(c_double)", 1.0d / Math.tan(DOUBLE_VALUE))
+        .addExpr("COT(c_float)", 1.0d / Math.tan(FLOAT_VALUE))
+        .addExpr("COT(c_decimal)", 1.0d / Math.tan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testDegrees() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("DEGREES(c_integer)", Math.toDegrees(INTEGER_VALUE))
+        .addExpr("DEGREES(c_bigint)", Math.toDegrees(LONG_VALUE))
+        .addExpr("DEGREES(c_smallint)", Math.toDegrees(SHORT_VALUE))
+        .addExpr("DEGREES(c_tinyint)", Math.toDegrees(BYTE_VALUE))
+        .addExpr("DEGREES(c_double)", Math.toDegrees(DOUBLE_VALUE))
+        .addExpr("DEGREES(c_float)", Math.toDegrees(FLOAT_VALUE))
+        .addExpr("DEGREES(c_decimal)", Math.toDegrees(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRadians() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RADIANS(c_integer)", Math.toRadians(INTEGER_VALUE))
+        .addExpr("RADIANS(c_bigint)", Math.toRadians(LONG_VALUE))
+        .addExpr("RADIANS(c_smallint)", Math.toRadians(SHORT_VALUE))
+        .addExpr("RADIANS(c_tinyint)", Math.toRadians(BYTE_VALUE))
+        .addExpr("RADIANS(c_double)", Math.toRadians(DOUBLE_VALUE))
+        .addExpr("RADIANS(c_float)", Math.toRadians(FLOAT_VALUE))
+        .addExpr("RADIANS(c_decimal)", Math.toRadians(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testCos() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("COS(c_integer)", Math.cos(INTEGER_VALUE))
+        .addExpr("COS(c_bigint)", Math.cos(LONG_VALUE))
+        .addExpr("COS(c_smallint)", Math.cos(SHORT_VALUE))
+        .addExpr("COS(c_tinyint)", Math.cos(BYTE_VALUE))
+        .addExpr("COS(c_double)", Math.cos(DOUBLE_VALUE))
+        .addExpr("COS(c_float)", Math.cos(FLOAT_VALUE))
+        .addExpr("COS(c_decimal)", Math.cos(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSin() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SIN(c_integer)", Math.sin(INTEGER_VALUE))
+        .addExpr("SIN(c_bigint)", Math.sin(LONG_VALUE))
+        .addExpr("SIN(c_smallint)", Math.sin(SHORT_VALUE))
+        .addExpr("SIN(c_tinyint)", Math.sin(BYTE_VALUE))
+        .addExpr("SIN(c_double)", Math.sin(DOUBLE_VALUE))
+        .addExpr("SIN(c_float)", Math.sin(FLOAT_VALUE))
+        .addExpr("SIN(c_decimal)", Math.sin(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testTan() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TAN(c_integer)", Math.tan(INTEGER_VALUE))
+        .addExpr("TAN(c_bigint)", Math.tan(LONG_VALUE))
+        .addExpr("TAN(c_smallint)", Math.tan(SHORT_VALUE))
+        .addExpr("TAN(c_tinyint)", Math.tan(BYTE_VALUE))
+        .addExpr("TAN(c_double)", Math.tan(DOUBLE_VALUE))
+        .addExpr("TAN(c_float)", Math.tan(FLOAT_VALUE))
+        .addExpr("TAN(c_decimal)", Math.tan(DECIMAL_VALUE.doubleValue()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testSign() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("SIGN(c_integer)", Integer.signum(INTEGER_VALUE))
+        .addExpr("SIGN(c_bigint)", (long) (Long.signum(LONG_VALUE)))
+        .addExpr("SIGN(c_smallint)", (short) (Integer.signum(SHORT_VALUE)))
+        .addExpr("SIGN(c_tinyint)", (byte) Integer.signum(BYTE_VALUE))
+        .addExpr("SIGN(c_double)", Math.signum(DOUBLE_VALUE))
+        .addExpr("SIGN(c_float)", Math.signum(FLOAT_VALUE))
+        .addExpr("SIGN(c_decimal)", BigDecimal.valueOf(DECIMAL_VALUE.signum()))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPower() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("POWER(c_integer, 2)", Math.pow(INTEGER_VALUE, 2))
+        .addExpr("POWER(c_bigint, 2)", Math.pow(LONG_VALUE, 2))
+        .addExpr("POWER(c_smallint, 2)", Math.pow(SHORT_VALUE, 2))
+        .addExpr("POWER(c_tinyint, 2)", Math.pow(BYTE_VALUE, 2))
+        .addExpr("POWER(c_double, 2)", Math.pow(DOUBLE_VALUE, 2))
+        .addExpr("POWER(c_float, 2)", Math.pow(FLOAT_VALUE, 2))
+        .addExpr("POWER(c_decimal, 2)", Math.pow(DECIMAL_VALUE.doubleValue(), 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPi() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("PI", Math.PI)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testAtan2() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("ATAN2(c_integer, 2)", Math.atan2(INTEGER_VALUE, 2))
+        .addExpr("ATAN2(c_bigint, 2)", Math.atan2(LONG_VALUE, 2))
+        .addExpr("ATAN2(c_smallint, 2)", Math.atan2(SHORT_VALUE, 2))
+        .addExpr("ATAN2(c_tinyint, 2)", Math.atan2(BYTE_VALUE, 2))
+        .addExpr("ATAN2(c_double, 2)", Math.atan2(DOUBLE_VALUE, 2))
+        .addExpr("ATAN2(c_float, 2)", Math.atan2(FLOAT_VALUE, 2))
+        .addExpr("ATAN2(c_decimal, 2)", Math.atan2(DECIMAL_VALUE.doubleValue(), 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testTruncate() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TRUNCATE(c_integer, 2)", SqlFunctions.struncate(INTEGER_VALUE, 2))
+        .addExpr("TRUNCATE(c_bigint, 2)", SqlFunctions.struncate(LONG_VALUE, 2))
+        .addExpr("TRUNCATE(c_smallint, 2)", (short) SqlFunctions.struncate(SHORT_VALUE, 2))
+        .addExpr("TRUNCATE(c_tinyint, 2)", (byte) SqlFunctions.struncate(BYTE_VALUE, 2))
+        .addExpr("TRUNCATE(c_double, 2)", SqlFunctions.struncate(DOUBLE_VALUE, 2))
+        .addExpr("TRUNCATE(c_float, 2)", (float) SqlFunctions.struncate(FLOAT_VALUE, 2))
+        .addExpr("TRUNCATE(c_decimal, 2)", SqlFunctions.struncate(DECIMAL_VALUE, 2))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRand() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RAND(c_integer)", new Random(INTEGER_VALUE).nextDouble())
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testRandInteger() throws Exception{
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("RAND_INTEGER(c_integer, c_integer)",
+            new Random(INTEGER_VALUE).nextInt(INTEGER_VALUE))
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
new file mode 100644
index 0000000..7a51a95
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for string functions.
+ */
+public class BeamSqlStringFunctionsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+  @Test
+  public void testStringFunctions() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("'hello' || ' world'", "hello world")
+        .addExpr("CHAR_LENGTH('hello')", 5)
+        .addExpr("CHARACTER_LENGTH('hello')", 5)
+        .addExpr("UPPER('hello')", "HELLO")
+        .addExpr("LOWER('HELLO')", "hello")
+
+        .addExpr("POSITION('world' IN 'helloworld')", 5)
+        .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+        .addExpr("TRIM(' hello ')", "hello")
+        .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
+        .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
+
+        .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello")
+        .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce")
+        .addExpr("SUBSTRING('hello' FROM 2)", "ello")
+        .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el")
+        .addExpr("INITCAP('hello world')", "Hello World")
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
new file mode 100644
index 0000000..2843e41
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSqlFnExecutor}.
+ */
+public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testBeamFilterRel() {
+    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null,
+        condition);
+
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
+    executor.prepare();
+
+    Assert.assertEquals(1, executor.exps.size());
+
+    BeamSqlExpression l1Exp = executor.exps.get(0);
+    assertTrue(l1Exp instanceof BeamSqlAndExpression);
+    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+    Assert.assertEquals(2, l1Exp.getOperands().size());
+    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+    assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
+    assertTrue(l1Right instanceof BeamSqlEqualsExpression);
+
+    Assert.assertEquals(2, l1Left.getOperands().size());
+    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);
+    BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1);
+    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+    Assert.assertEquals(2, l1Right.getOperands().size());
+    BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0);
+    BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1);
+    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+  }
+
+  @Test
+  public void testBeamProjectRel() {
+    BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(),
+        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+        rexBuilder.identityProjects(relDataType), relDataType);
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
+
+    executor.prepare();
+    Assert.assertEquals(4, executor.exps.size());
+    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
+  }
+
+
+  @Test
+  public void testBuildExpression_logical() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlAndExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOrExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlNotExpression);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_andOr_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperandCount() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test
+  public void testBuildExpression_arithmetic() {
+    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class);
+  }
+
+  private void testBuildArithmeticExpression(SqlOperator fn,
+      Class<? extends BeamSqlExpression> clazz) {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+    ));
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+
+    assertTrue(exp.getClass().equals(clazz));
+  }
+
+  @Test
+  public void testBuildExpression_string()  {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello "),
+            rexBuilder.makeLiteral("world")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlConcatExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello"),
+            rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+                rexBuilder.makeBigintLiteral(BigDecimal.ONE))
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlUpperExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+        Arrays.asList(
+            rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlTrimExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCaseExpression);
+  }
+
+  @Test
+  public void testBuildExpression_date() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
+    calendar.setTime(new Date());
+
+    // CEIL
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateCeilExpression);
+
+    // FLOOR
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateFloorExpression);
+
+    // EXTRACT == EXTRACT_DATE?
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
+        Arrays.asList(
+            rexBuilder.makeFlag(TimeUnitRange.MONTH),
+            rexBuilder.makeDateLiteral(calendar)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlExtractExpression);
+
+    // CURRENT_DATE
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentDateExpression);
+
+    // LOCALTIME
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
+
+    // LOCALTIMESTAMP
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
new file mode 100644
index 0000000..c6478a6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.planner.BeamRuleSets;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}.
+ */
+public class BeamSqlFnExecutorTestBase {
+  public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+  public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder);
+
+  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  public static RelDataType relDataType;
+
+  public static BeamSqlRowType beamRowType;
+  public static BeamSqlRow record;
+
+  public static RelBuilder relBuilder;
+
+  @BeforeClass
+  public static void prepare() {
+    relDataType = TYPE_FACTORY.builder()
+        .add("order_id", SqlTypeName.BIGINT)
+        .add("site_id", SqlTypeName.INTEGER)
+        .add("price", SqlTypeName.DOUBLE)
+        .add("order_time", SqlTypeName.BIGINT).build();
+
+    beamRowType = CalciteUtils.toBeamRowType(relDataType);
+    record = new BeamSqlRow(beamRowType);
+
+    record.addField(0, 1234567L);
+    record.addField(1, 0);
+    record.addField(2, 8.9);
+    record.addField(3, 1234567L);
+
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+    FrameworkConfig config = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+        .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+    relBuilder = RelBuilder.create(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
new file mode 100644
index 0000000..7bfbe20
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlIsNullExpression} and
+ * {@link BeamSqlIsNotNullExpression}.
+ */
+public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testIsNull() {
+    BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testIsNotNull() {
+    BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
new file mode 100644
index 0000000..b6f65a1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
+ */
+public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testAnd() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testOr() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
new file mode 100644
index 0000000..28ed920
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCaseExpression.
+ */
+public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertTrue(new BeamSqlCaseExpression(operands).accept());
+
+    // even param count
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+    // `when` type error
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+    // `then` type mixing
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("hello", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("world", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertEquals("hello1", new BeamSqlCaseExpression(operands)
+        .evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..feefc45
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  private List<BeamSqlExpression> operands;
+
+  @Before
+  public void setup() {
+    operands = new ArrayList<>();
+  }
+
+  @Test
+  public void testForOperands() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+    Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+  }
+
+  @Test
+  public void testForIntegerToBigintTypeCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForDoubleToBigIntCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForIntegerToDateCast() {
+    // test for yyyyMMdd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyyyMMddDateFormat() {
+    //test for yyyy-MM-dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyMMddDateFormat() {
+    // test for yy.MM.dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testForTimestampCastExpression() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+    Assert.assertEquals(SqlTypeName.TIMESTAMP,
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+            .getOutputType());
+  }
+
+  @Test
+  public void testDateTimeFormatWithMillis() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormatWithTimezone() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormat() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testForCastTypeNotSupported() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+}