You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:48 UTC
[25/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
new file mode 100644
index 0000000..ffc6833
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import com.google.common.base.Joiner;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.util.Pair;
+import org.junit.Rule;
+
+/**
+ * Base class for all built-in functions integration tests.
+ */
+public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+ private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
+ static {
+ JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
+ JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
+ JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
+ JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
+ JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
+ JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
+ }
+
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+
+ protected PCollection<BeamSqlRow> getTestPCollection() {
+ BeamSqlRowType type = BeamSqlRowType.create(
+ Arrays.asList("ts", "c_tinyint", "c_smallint",
+ "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
+ "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
+ Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
+ Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
+ Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
+ );
+ try {
+ return MockedBoundedTable
+ .of(type)
+ .addRows(
+ parseDate("1986-02-15 11:35:26"),
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0,
+ BigDecimal.ONE,
+ (byte) 127,
+ (short) 32767,
+ 2147483647,
+ 9223372036854775807L
+ )
+ .buildIOReader(pipeline)
+ .setCoder(new BeamSqlRowCoder(type));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Date parseDate(String str) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ return sdf.parse(str);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Helper class to make write integration test for built-in functions easier.
+ *
+ * <p>example usage:
+ * <pre>{@code
+ * ExpressionChecker checker = new ExpressionChecker()
+ * .addExpr("1 + 1", 2)
+ * .addExpr("1.0 + 1", 2.0)
+ * .addExpr("1 + 1.0", 2.0)
+ * .addExpr("1.0 + 1.0", 2.0)
+ * .addExpr("c_tinyint + c_tinyint", (byte) 2);
+ * checker.buildRunAndCheck(inputCollections);
+ * }</pre>
+ */
+ public class ExpressionChecker {
+ private transient List<Pair<String, Object>> exps = new ArrayList<>();
+
+ public ExpressionChecker addExpr(String expression, Object expectedValue) {
+ exps.add(Pair.of(expression, expectedValue));
+ return this;
+ }
+
+ private String getSql() {
+ List<String> expStrs = new ArrayList<>();
+ for (Pair<String, Object> pair : exps) {
+ expStrs.add(pair.getKey());
+ }
+ return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION";
+ }
+
+ /**
+ * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
+ */
+ public void buildRunAndCheck() {
+ PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+ System.out.println("SQL:>\n" + getSql());
+ try {
+ List<String> names = new ArrayList<>();
+ List<Integer> types = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+
+ for (Pair<String, Object> pair : exps) {
+ names.add(pair.getKey());
+ types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
+ values.add(pair.getValue());
+ }
+
+ PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder
+ .of(BeamSqlRowType.create(names, types))
+ .addRows(values)
+ .getRows()
+ );
+ inputCollection.getPipeline().run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
new file mode 100644
index 0000000..14de5b6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for comparison operators.
+ */
+public class BeamSqlComparisonOperatorsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+ @Test
+ public void testEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_1 = c_tinyint_1", true)
+ .addExpr("c_tinyint_1 = c_tinyint_2", false)
+ .addExpr("c_smallint_1 = c_smallint_1", true)
+ .addExpr("c_smallint_1 = c_smallint_2", false)
+ .addExpr("c_integer_1 = c_integer_1", true)
+ .addExpr("c_integer_1 = c_integer_2", false)
+ .addExpr("c_bigint_1 = c_bigint_1", true)
+ .addExpr("c_bigint_1 = c_bigint_2", false)
+ .addExpr("c_float_1 = c_float_1", true)
+ .addExpr("c_float_1 = c_float_2", false)
+ .addExpr("c_double_1 = c_double_1", true)
+ .addExpr("c_double_1 = c_double_2", false)
+ .addExpr("c_decimal_1 = c_decimal_1", true)
+ .addExpr("c_decimal_1 = c_decimal_2", false)
+ .addExpr("c_varchar_1 = c_varchar_1", true)
+ .addExpr("c_varchar_1 = c_varchar_2", false)
+ .addExpr("c_boolean_true = c_boolean_true", true)
+ .addExpr("c_boolean_true = c_boolean_false", false)
+
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testNotEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_1 <> c_tinyint_1", false)
+ .addExpr("c_tinyint_1 <> c_tinyint_2", true)
+ .addExpr("c_smallint_1 <> c_smallint_1", false)
+ .addExpr("c_smallint_1 <> c_smallint_2", true)
+ .addExpr("c_integer_1 <> c_integer_1", false)
+ .addExpr("c_integer_1 <> c_integer_2", true)
+ .addExpr("c_bigint_1 <> c_bigint_1", false)
+ .addExpr("c_bigint_1 <> c_bigint_2", true)
+ .addExpr("c_float_1 <> c_float_1", false)
+ .addExpr("c_float_1 <> c_float_2", true)
+ .addExpr("c_double_1 <> c_double_1", false)
+ .addExpr("c_double_1 <> c_double_2", true)
+ .addExpr("c_decimal_1 <> c_decimal_1", false)
+ .addExpr("c_decimal_1 <> c_decimal_2", true)
+ .addExpr("c_varchar_1 <> c_varchar_1", false)
+ .addExpr("c_varchar_1 <> c_varchar_2", true)
+ .addExpr("c_boolean_true <> c_boolean_true", false)
+ .addExpr("c_boolean_true <> c_boolean_false", true)
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testGreaterThan() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 > c_tinyint_1", true)
+ .addExpr("c_tinyint_1 > c_tinyint_1", false)
+ .addExpr("c_tinyint_1 > c_tinyint_2", false)
+
+ .addExpr("c_smallint_2 > c_smallint_1", true)
+ .addExpr("c_smallint_1 > c_smallint_1", false)
+ .addExpr("c_smallint_1 > c_smallint_2", false)
+
+ .addExpr("c_integer_2 > c_integer_1", true)
+ .addExpr("c_integer_1 > c_integer_1", false)
+ .addExpr("c_integer_1 > c_integer_2", false)
+
+ .addExpr("c_bigint_2 > c_bigint_1", true)
+ .addExpr("c_bigint_1 > c_bigint_1", false)
+ .addExpr("c_bigint_1 > c_bigint_2", false)
+
+ .addExpr("c_float_2 > c_float_1", true)
+ .addExpr("c_float_1 > c_float_1", false)
+ .addExpr("c_float_1 > c_float_2", false)
+
+ .addExpr("c_double_2 > c_double_1", true)
+ .addExpr("c_double_1 > c_double_1", false)
+ .addExpr("c_double_1 > c_double_2", false)
+
+ .addExpr("c_decimal_2 > c_decimal_1", true)
+ .addExpr("c_decimal_1 > c_decimal_1", false)
+ .addExpr("c_decimal_1 > c_decimal_2", false)
+
+ .addExpr("c_varchar_2 > c_varchar_1", true)
+ .addExpr("c_varchar_1 > c_varchar_1", false)
+ .addExpr("c_varchar_1 > c_varchar_2", false)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGreaterThanException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false > c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testGreaterThanOrEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 >= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 >= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 >= c_tinyint_2", false)
+
+ .addExpr("c_smallint_2 >= c_smallint_1", true)
+ .addExpr("c_smallint_1 >= c_smallint_1", true)
+ .addExpr("c_smallint_1 >= c_smallint_2", false)
+
+ .addExpr("c_integer_2 >= c_integer_1", true)
+ .addExpr("c_integer_1 >= c_integer_1", true)
+ .addExpr("c_integer_1 >= c_integer_2", false)
+
+ .addExpr("c_bigint_2 >= c_bigint_1", true)
+ .addExpr("c_bigint_1 >= c_bigint_1", true)
+ .addExpr("c_bigint_1 >= c_bigint_2", false)
+
+ .addExpr("c_float_2 >= c_float_1", true)
+ .addExpr("c_float_1 >= c_float_1", true)
+ .addExpr("c_float_1 >= c_float_2", false)
+
+ .addExpr("c_double_2 >= c_double_1", true)
+ .addExpr("c_double_1 >= c_double_1", true)
+ .addExpr("c_double_1 >= c_double_2", false)
+
+ .addExpr("c_decimal_2 >= c_decimal_1", true)
+ .addExpr("c_decimal_1 >= c_decimal_1", true)
+ .addExpr("c_decimal_1 >= c_decimal_2", false)
+
+ .addExpr("c_varchar_2 >= c_varchar_1", true)
+ .addExpr("c_varchar_1 >= c_varchar_1", true)
+ .addExpr("c_varchar_1 >= c_varchar_2", false)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGreaterThanOrEqualsException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false >= c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLessThan() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 < c_tinyint_1", false)
+ .addExpr("c_tinyint_1 < c_tinyint_1", false)
+ .addExpr("c_tinyint_1 < c_tinyint_2", true)
+
+ .addExpr("c_smallint_2 < c_smallint_1", false)
+ .addExpr("c_smallint_1 < c_smallint_1", false)
+ .addExpr("c_smallint_1 < c_smallint_2", true)
+
+ .addExpr("c_integer_2 < c_integer_1", false)
+ .addExpr("c_integer_1 < c_integer_1", false)
+ .addExpr("c_integer_1 < c_integer_2", true)
+
+ .addExpr("c_bigint_2 < c_bigint_1", false)
+ .addExpr("c_bigint_1 < c_bigint_1", false)
+ .addExpr("c_bigint_1 < c_bigint_2", true)
+
+ .addExpr("c_float_2 < c_float_1", false)
+ .addExpr("c_float_1 < c_float_1", false)
+ .addExpr("c_float_1 < c_float_2", true)
+
+ .addExpr("c_double_2 < c_double_1", false)
+ .addExpr("c_double_1 < c_double_1", false)
+ .addExpr("c_double_1 < c_double_2", true)
+
+ .addExpr("c_decimal_2 < c_decimal_1", false)
+ .addExpr("c_decimal_1 < c_decimal_1", false)
+ .addExpr("c_decimal_1 < c_decimal_2", true)
+
+ .addExpr("c_varchar_2 < c_varchar_1", false)
+ .addExpr("c_varchar_1 < c_varchar_1", false)
+ .addExpr("c_varchar_1 < c_varchar_2", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testLessThanException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false < c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLessThanOrEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 <= c_tinyint_1", false)
+ .addExpr("c_tinyint_1 <= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 <= c_tinyint_2", true)
+
+ .addExpr("c_smallint_2 <= c_smallint_1", false)
+ .addExpr("c_smallint_1 <= c_smallint_1", true)
+ .addExpr("c_smallint_1 <= c_smallint_2", true)
+
+ .addExpr("c_integer_2 <= c_integer_1", false)
+ .addExpr("c_integer_1 <= c_integer_1", true)
+ .addExpr("c_integer_1 <= c_integer_2", true)
+
+ .addExpr("c_bigint_2 <= c_bigint_1", false)
+ .addExpr("c_bigint_1 <= c_bigint_1", true)
+ .addExpr("c_bigint_1 <= c_bigint_2", true)
+
+ .addExpr("c_float_2 <= c_float_1", false)
+ .addExpr("c_float_1 <= c_float_1", true)
+ .addExpr("c_float_1 <= c_float_2", true)
+
+ .addExpr("c_double_2 <= c_double_1", false)
+ .addExpr("c_double_1 <= c_double_1", true)
+ .addExpr("c_double_1 <= c_double_2", true)
+
+ .addExpr("c_decimal_2 <= c_decimal_1", false)
+ .addExpr("c_decimal_1 <= c_decimal_1", true)
+ .addExpr("c_decimal_1 <= c_decimal_2", true)
+
+ .addExpr("c_varchar_2 <= c_varchar_1", false)
+ .addExpr("c_varchar_1 <= c_varchar_1", true)
+ .addExpr("c_varchar_1 <= c_varchar_2", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testLessThanOrEqualsException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false <= c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testIsNullAndIsNotNull() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 IS NOT NULL", true)
+ .addExpr("NULL IS NOT NULL", false)
+
+ .addExpr("1 IS NULL", false)
+ .addExpr("NULL IS NULL", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Override protected PCollection<BeamSqlRow> getTestPCollection() {
+ BeamSqlRowType type = BeamSqlRowType.create(
+ Arrays.asList(
+ "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
+ "c_smallint_0", "c_smallint_1", "c_smallint_2",
+ "c_integer_0", "c_integer_1", "c_integer_2",
+ "c_bigint_0", "c_bigint_1", "c_bigint_2",
+ "c_float_0", "c_float_1", "c_float_2",
+ "c_double_0", "c_double_1", "c_double_2",
+ "c_decimal_0", "c_decimal_1", "c_decimal_2",
+ "c_varchar_0", "c_varchar_1", "c_varchar_2",
+ "c_boolean_false", "c_boolean_true"
+ ),
+ Arrays.asList(
+ Types.TINYINT, Types.TINYINT, Types.TINYINT,
+ Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+ Types.INTEGER, Types.INTEGER, Types.INTEGER,
+ Types.BIGINT, Types.BIGINT, Types.BIGINT,
+ Types.FLOAT, Types.FLOAT, Types.FLOAT,
+ Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+ Types.DECIMAL, Types.DECIMAL, Types.DECIMAL,
+ Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
+ Types.BOOLEAN, Types.BOOLEAN
+ )
+ );
+ try {
+ return MockedBoundedTable
+ .of(type)
+ .addRows(
+ (byte) 0, (byte) 1, (byte) 2,
+ (short) 0, (short) 1, (short) 2,
+ 0, 1, 2,
+ 0L, 1L, 2L,
+ 0.0f, 1.0f, 2.0f,
+ 0.0, 1.0, 2.0,
+ BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE),
+ "a", "b", "c",
+ false, true
+ )
+ .buildIOReader(pipeline)
+ .setCoder(new BeamSqlRowCoder(type));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..f4416ce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for conditional functions.
+ */
+public class BeamSqlConditionalFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ @Test
+ public void testConditionalFunctions() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr(
+ "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END",
+ "hello"
+ )
+ .addExpr(
+ "CASE 2 "
+ + "WHEN 1 THEN 'hello' "
+ + "WHEN 3 THEN 'bond' "
+ + "ELSE 'world' END",
+ "world"
+ )
+ .addExpr(
+ "CASE "
+ + "WHEN 1 = 1 THEN 'hello' "
+ + "ELSE 'world' END",
+ "hello"
+ )
+ .addExpr(
+ "CASE "
+ + "WHEN 1 > 1 THEN 'hello' "
+ + "ELSE 'world' END",
+ "world"
+ )
+ .addExpr("NULLIF(5, 4) ", 5)
+ .addExpr("COALESCE(1, 5) ", 1)
+ .addExpr("COALESCE(NULL, 5) ", 5)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
new file mode 100644
index 0000000..181c991
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for date functions.
+ */
+public class BeamSqlDateFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ @Test public void testDateTimeFunctions() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
+ .addExpr("YEAR(ts)", 1986L)
+ .addExpr("QUARTER(ts)", 1L)
+ .addExpr("MONTH(ts)", 2L)
+ .addExpr("WEEK(ts)", 7L)
+ .addExpr("DAYOFMONTH(ts)", 15L)
+ .addExpr("DAYOFYEAR(ts)", 46L)
+ .addExpr("DAYOFWEEK(ts)", 7L)
+ .addExpr("HOUR(ts)", 11L)
+ .addExpr("MINUTE(ts)", 35L)
+ .addExpr("SECOND(ts)", 26L)
+ .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
+ .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"))
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test public void testDateTimeFunctions_currentTime() throws Exception {
+ String sql = "SELECT "
+ + "LOCALTIME as l,"
+ + "LOCALTIMESTAMP as l1,"
+ + "CURRENT_DATE as c1,"
+ + "CURRENT_TIME as c2,"
+ + "CURRENT_TIMESTAMP as c3"
+ + " FROM PCOLLECTION"
+ ;
+ PCollection<BeamSqlRow> rows = getTestPCollection().apply(
+ BeamSql.simpleQuery(sql));
+ PAssert.that(rows).satisfies(new Checker());
+ pipeline.run();
+ }
+
+ private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+ @Override public Void apply(Iterable<BeamSqlRow> input) {
+ Iterator<BeamSqlRow> iter = input.iterator();
+ assertTrue(iter.hasNext());
+ BeamSqlRow row = iter.next();
+ // LOCALTIME
+ Date date = new Date();
+ assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);
+ assertTrue(date.getTime() - row.getDate(1).getTime() < 1000);
+ assertTrue(date.getTime() - row.getDate(2).getTime() < 1000);
+ assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000);
+ assertTrue(date.getTime() - row.getDate(4).getTime() < 1000);
+ assertFalse(iter.hasNext());
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..b408d78
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for logical functions.
+ */
+public class BeamSqlLogicalFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ @Test
+ public void testStringFunctions() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_integer = 1 AND c_bigint = 1", true)
+ .addExpr("c_integer = 1 OR c_bigint = 2", true)
+ .addExpr("NOT c_bigint = 2", true)
+ .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true)
+ .addExpr("c_integer = 2 AND c_bigint = 1", false)
+ .addExpr("c_integer = 2 OR c_bigint = 2", false)
+ .addExpr("NOT c_bigint = 1", false)
+ .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
new file mode 100644
index 0000000..995caaf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.util.Random;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.junit.Test;
+
+/**
+ * Integration test for built-in MATH functions.
+ */
+public class BeamSqlMathFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ private static final int INTEGER_VALUE = 1;
+ private static final long LONG_VALUE = 1L;
+ private static final short SHORT_VALUE = 1;
+ private static final byte BYTE_VALUE = 1;
+ private static final double DOUBLE_VALUE = 1.0;
+ private static final float FLOAT_VALUE = 1.0f;
+ private static final BigDecimal DECIMAL_VALUE = new BigDecimal(1);
+
+ @Test
+ public void testAbs() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ABS(c_integer)", Math.abs(INTEGER_VALUE))
+ .addExpr("ABS(c_bigint)", Math.abs(LONG_VALUE))
+ .addExpr("ABS(c_smallint)", (short) Math.abs(SHORT_VALUE))
+ .addExpr("ABS(c_tinyint)", (byte) Math.abs(BYTE_VALUE))
+ .addExpr("ABS(c_double)", Math.abs(DOUBLE_VALUE))
+ .addExpr("ABS(c_float)", Math.abs(FLOAT_VALUE))
+ .addExpr("ABS(c_decimal)", new BigDecimal(Math.abs(DECIMAL_VALUE.doubleValue())))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testSqrt() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("SQRT(c_integer)", Math.sqrt(INTEGER_VALUE))
+ .addExpr("SQRT(c_bigint)", Math.sqrt(LONG_VALUE))
+ .addExpr("SQRT(c_smallint)", Math.sqrt(SHORT_VALUE))
+ .addExpr("SQRT(c_tinyint)", Math.sqrt(BYTE_VALUE))
+ .addExpr("SQRT(c_double)", Math.sqrt(DOUBLE_VALUE))
+ .addExpr("SQRT(c_float)", Math.sqrt(FLOAT_VALUE))
+ .addExpr("SQRT(c_decimal)", Math.sqrt(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testRound() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ROUND(c_integer, 0)", SqlFunctions.sround(INTEGER_VALUE, 0))
+ .addExpr("ROUND(c_bigint, 0)", SqlFunctions.sround(LONG_VALUE, 0))
+ .addExpr("ROUND(c_smallint, 0)", (short) SqlFunctions.sround(SHORT_VALUE, 0))
+ .addExpr("ROUND(c_tinyint, 0)", (byte) SqlFunctions.sround(BYTE_VALUE, 0))
+ .addExpr("ROUND(c_double, 0)", SqlFunctions.sround(DOUBLE_VALUE, 0))
+ .addExpr("ROUND(c_float, 0)", (float) SqlFunctions.sround(FLOAT_VALUE, 0))
+ .addExpr("ROUND(c_decimal, 0)",
+ new BigDecimal(SqlFunctions.sround(DECIMAL_VALUE.doubleValue(), 0)))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLn() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("LN(c_integer)", Math.log(INTEGER_VALUE))
+ .addExpr("LN(c_bigint)", Math.log(LONG_VALUE))
+ .addExpr("LN(c_smallint)", Math.log(SHORT_VALUE))
+ .addExpr("LN(c_tinyint)", Math.log(BYTE_VALUE))
+ .addExpr("LN(c_double)", Math.log(DOUBLE_VALUE))
+ .addExpr("LN(c_float)", Math.log(FLOAT_VALUE))
+ .addExpr("LN(c_decimal)", Math.log(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLog10() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("LOG10(c_integer)", Math.log10(INTEGER_VALUE))
+ .addExpr("LOG10(c_bigint)", Math.log10(LONG_VALUE))
+ .addExpr("LOG10(c_smallint)", Math.log10(SHORT_VALUE))
+ .addExpr("LOG10(c_tinyint)", Math.log10(BYTE_VALUE))
+ .addExpr("LOG10(c_double)", Math.log10(DOUBLE_VALUE))
+ .addExpr("LOG10(c_float)", Math.log10(FLOAT_VALUE))
+ .addExpr("LOG10(c_decimal)", Math.log10(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testExp() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("EXP(c_integer)", Math.exp(INTEGER_VALUE))
+ .addExpr("EXP(c_bigint)", Math.exp(LONG_VALUE))
+ .addExpr("EXP(c_smallint)", Math.exp(SHORT_VALUE))
+ .addExpr("EXP(c_tinyint)", Math.exp(BYTE_VALUE))
+ .addExpr("EXP(c_double)", Math.exp(DOUBLE_VALUE))
+ .addExpr("EXP(c_float)", Math.exp(FLOAT_VALUE))
+ .addExpr("EXP(c_decimal)", Math.exp(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testAcos() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ACOS(c_integer)", Math.acos(INTEGER_VALUE))
+ .addExpr("ACOS(c_bigint)", Math.acos(LONG_VALUE))
+ .addExpr("ACOS(c_smallint)", Math.acos(SHORT_VALUE))
+ .addExpr("ACOS(c_tinyint)", Math.acos(BYTE_VALUE))
+ .addExpr("ACOS(c_double)", Math.acos(DOUBLE_VALUE))
+ .addExpr("ACOS(c_float)", Math.acos(FLOAT_VALUE))
+ .addExpr("ACOS(c_decimal)", Math.acos(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testAsin() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ASIN(c_integer)", Math.asin(INTEGER_VALUE))
+ .addExpr("ASIN(c_bigint)", Math.asin(LONG_VALUE))
+ .addExpr("ASIN(c_smallint)", Math.asin(SHORT_VALUE))
+ .addExpr("ASIN(c_tinyint)", Math.asin(BYTE_VALUE))
+ .addExpr("ASIN(c_double)", Math.asin(DOUBLE_VALUE))
+ .addExpr("ASIN(c_float)", Math.asin(FLOAT_VALUE))
+ .addExpr("ASIN(c_decimal)", Math.asin(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testAtan() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ATAN(c_integer)", Math.atan(INTEGER_VALUE))
+ .addExpr("ATAN(c_bigint)", Math.atan(LONG_VALUE))
+ .addExpr("ATAN(c_smallint)", Math.atan(SHORT_VALUE))
+ .addExpr("ATAN(c_tinyint)", Math.atan(BYTE_VALUE))
+ .addExpr("ATAN(c_double)", Math.atan(DOUBLE_VALUE))
+ .addExpr("ATAN(c_float)", Math.atan(FLOAT_VALUE))
+ .addExpr("ATAN(c_decimal)", Math.atan(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testCot() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("COT(c_integer)", 1.0d / Math.tan(INTEGER_VALUE))
+ .addExpr("COT(c_bigint)", 1.0d / Math.tan(LONG_VALUE))
+ .addExpr("COT(c_smallint)", 1.0d / Math.tan(SHORT_VALUE))
+ .addExpr("COT(c_tinyint)", 1.0d / Math.tan(BYTE_VALUE))
+ .addExpr("COT(c_double)", 1.0d / Math.tan(DOUBLE_VALUE))
+ .addExpr("COT(c_float)", 1.0d / Math.tan(FLOAT_VALUE))
+ .addExpr("COT(c_decimal)", 1.0d / Math.tan(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testDegrees() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("DEGREES(c_integer)", Math.toDegrees(INTEGER_VALUE))
+ .addExpr("DEGREES(c_bigint)", Math.toDegrees(LONG_VALUE))
+ .addExpr("DEGREES(c_smallint)", Math.toDegrees(SHORT_VALUE))
+ .addExpr("DEGREES(c_tinyint)", Math.toDegrees(BYTE_VALUE))
+ .addExpr("DEGREES(c_double)", Math.toDegrees(DOUBLE_VALUE))
+ .addExpr("DEGREES(c_float)", Math.toDegrees(FLOAT_VALUE))
+ .addExpr("DEGREES(c_decimal)", Math.toDegrees(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testRadians() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("RADIANS(c_integer)", Math.toRadians(INTEGER_VALUE))
+ .addExpr("RADIANS(c_bigint)", Math.toRadians(LONG_VALUE))
+ .addExpr("RADIANS(c_smallint)", Math.toRadians(SHORT_VALUE))
+ .addExpr("RADIANS(c_tinyint)", Math.toRadians(BYTE_VALUE))
+ .addExpr("RADIANS(c_double)", Math.toRadians(DOUBLE_VALUE))
+ .addExpr("RADIANS(c_float)", Math.toRadians(FLOAT_VALUE))
+ .addExpr("RADIANS(c_decimal)", Math.toRadians(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testCos() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("COS(c_integer)", Math.cos(INTEGER_VALUE))
+ .addExpr("COS(c_bigint)", Math.cos(LONG_VALUE))
+ .addExpr("COS(c_smallint)", Math.cos(SHORT_VALUE))
+ .addExpr("COS(c_tinyint)", Math.cos(BYTE_VALUE))
+ .addExpr("COS(c_double)", Math.cos(DOUBLE_VALUE))
+ .addExpr("COS(c_float)", Math.cos(FLOAT_VALUE))
+ .addExpr("COS(c_decimal)", Math.cos(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testSin() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("SIN(c_integer)", Math.sin(INTEGER_VALUE))
+ .addExpr("SIN(c_bigint)", Math.sin(LONG_VALUE))
+ .addExpr("SIN(c_smallint)", Math.sin(SHORT_VALUE))
+ .addExpr("SIN(c_tinyint)", Math.sin(BYTE_VALUE))
+ .addExpr("SIN(c_double)", Math.sin(DOUBLE_VALUE))
+ .addExpr("SIN(c_float)", Math.sin(FLOAT_VALUE))
+ .addExpr("SIN(c_decimal)", Math.sin(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testTan() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TAN(c_integer)", Math.tan(INTEGER_VALUE))
+ .addExpr("TAN(c_bigint)", Math.tan(LONG_VALUE))
+ .addExpr("TAN(c_smallint)", Math.tan(SHORT_VALUE))
+ .addExpr("TAN(c_tinyint)", Math.tan(BYTE_VALUE))
+ .addExpr("TAN(c_double)", Math.tan(DOUBLE_VALUE))
+ .addExpr("TAN(c_float)", Math.tan(FLOAT_VALUE))
+ .addExpr("TAN(c_decimal)", Math.tan(DECIMAL_VALUE.doubleValue()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testSign() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("SIGN(c_integer)", Integer.signum(INTEGER_VALUE))
+ .addExpr("SIGN(c_bigint)", (long) (Long.signum(LONG_VALUE)))
+ .addExpr("SIGN(c_smallint)", (short) (Integer.signum(SHORT_VALUE)))
+ .addExpr("SIGN(c_tinyint)", (byte) Integer.signum(BYTE_VALUE))
+ .addExpr("SIGN(c_double)", Math.signum(DOUBLE_VALUE))
+ .addExpr("SIGN(c_float)", Math.signum(FLOAT_VALUE))
+ .addExpr("SIGN(c_decimal)", BigDecimal.valueOf(DECIMAL_VALUE.signum()))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testPower() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("POWER(c_integer, 2)", Math.pow(INTEGER_VALUE, 2))
+ .addExpr("POWER(c_bigint, 2)", Math.pow(LONG_VALUE, 2))
+ .addExpr("POWER(c_smallint, 2)", Math.pow(SHORT_VALUE, 2))
+ .addExpr("POWER(c_tinyint, 2)", Math.pow(BYTE_VALUE, 2))
+ .addExpr("POWER(c_double, 2)", Math.pow(DOUBLE_VALUE, 2))
+ .addExpr("POWER(c_float, 2)", Math.pow(FLOAT_VALUE, 2))
+ .addExpr("POWER(c_decimal, 2)", Math.pow(DECIMAL_VALUE.doubleValue(), 2))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testPi() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("PI", Math.PI)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testAtan2() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("ATAN2(c_integer, 2)", Math.atan2(INTEGER_VALUE, 2))
+ .addExpr("ATAN2(c_bigint, 2)", Math.atan2(LONG_VALUE, 2))
+ .addExpr("ATAN2(c_smallint, 2)", Math.atan2(SHORT_VALUE, 2))
+ .addExpr("ATAN2(c_tinyint, 2)", Math.atan2(BYTE_VALUE, 2))
+ .addExpr("ATAN2(c_double, 2)", Math.atan2(DOUBLE_VALUE, 2))
+ .addExpr("ATAN2(c_float, 2)", Math.atan2(FLOAT_VALUE, 2))
+ .addExpr("ATAN2(c_decimal, 2)", Math.atan2(DECIMAL_VALUE.doubleValue(), 2))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testTruncate() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TRUNCATE(c_integer, 2)", SqlFunctions.struncate(INTEGER_VALUE, 2))
+ .addExpr("TRUNCATE(c_bigint, 2)", SqlFunctions.struncate(LONG_VALUE, 2))
+ .addExpr("TRUNCATE(c_smallint, 2)", (short) SqlFunctions.struncate(SHORT_VALUE, 2))
+ .addExpr("TRUNCATE(c_tinyint, 2)", (byte) SqlFunctions.struncate(BYTE_VALUE, 2))
+ .addExpr("TRUNCATE(c_double, 2)", SqlFunctions.struncate(DOUBLE_VALUE, 2))
+ .addExpr("TRUNCATE(c_float, 2)", (float) SqlFunctions.struncate(FLOAT_VALUE, 2))
+ .addExpr("TRUNCATE(c_decimal, 2)", SqlFunctions.struncate(DECIMAL_VALUE, 2))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testRand() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("RAND(c_integer)", new Random(INTEGER_VALUE).nextDouble())
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testRandInteger() throws Exception{
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("RAND_INTEGER(c_integer, c_integer)",
+ new Random(INTEGER_VALUE).nextInt(INTEGER_VALUE))
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
new file mode 100644
index 0000000..7a51a95
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for string functions.
+ */
+public class BeamSqlStringFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ @Test
+ public void testStringFunctions() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("'hello' || ' world'", "hello world")
+ .addExpr("CHAR_LENGTH('hello')", 5)
+ .addExpr("CHARACTER_LENGTH('hello')", 5)
+ .addExpr("UPPER('hello')", "HELLO")
+ .addExpr("LOWER('HELLO')", "hello")
+
+ .addExpr("POSITION('world' IN 'helloworld')", 5)
+ .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+ .addExpr("TRIM(' hello ')", "hello")
+ .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
+ .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
+
+ .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello")
+ .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce")
+ .addExpr("SUBSTRING('hello' FROM 2)", "ello")
+ .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el")
+ .addExpr("INITCAP('hello world')", "Hello World")
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
new file mode 100644
index 0000000..2843e41
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSqlFnExecutor}.
+ */
+public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testBeamFilterRel() {
+ RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+ rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+ rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+ rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+ BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null,
+ condition);
+
+ BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
+ executor.prepare();
+
+ Assert.assertEquals(1, executor.exps.size());
+
+ BeamSqlExpression l1Exp = executor.exps.get(0);
+ assertTrue(l1Exp instanceof BeamSqlAndExpression);
+ Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+ Assert.assertEquals(2, l1Exp.getOperands().size());
+ BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+ BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+ assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
+ assertTrue(l1Right instanceof BeamSqlEqualsExpression);
+
+ Assert.assertEquals(2, l1Left.getOperands().size());
+ BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);
+ BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1);
+ assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+ assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+ Assert.assertEquals(2, l1Right.getOperands().size());
+ BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0);
+ BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1);
+ assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+ assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+ }
+
+ @Test
+ public void testBeamProjectRel() {
+ BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(),
+ relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+ rexBuilder.identityProjects(relDataType), relDataType);
+ BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
+
+ executor.prepare();
+ Assert.assertEquals(4, executor.exps.size());
+ assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
+ }
+
+
+ @Test
+ public void testBuildExpression_logical() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(false)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlAndExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(false)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOrExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlNotExpression);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_andOr_invalidOperand() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_not_invalidOperand() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_not_invalidOperandCount() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(true)
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+ @Test
+ public void testBuildExpression_arithmetic() {
+ testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class);
+ }
+
+ private void testBuildArithmeticExpression(SqlOperator fn,
+ Class<? extends BeamSqlExpression> clazz) {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+ rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+ rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+ ));
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+
+ assertTrue(exp.getClass().equals(clazz));
+ }
+
+ @Test
+ public void testBuildExpression_string() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello "),
+ rexBuilder.makeLiteral("world")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlConcatExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello"),
+ rexBuilder.makeLiteral("worldhello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlPositionExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello"),
+ rexBuilder.makeLiteral("worldhello"),
+ rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE))
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlPositionExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlUpperExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+ Arrays.asList(
+ rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlTrimExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCaseExpression);
+ }
+
+ @Test
+ public void testBuildExpression_date() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
+ calendar.setTime(new Date());
+
+ // CEIL
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
+ Arrays.asList(
+ rexBuilder.makeDateLiteral(calendar),
+ rexBuilder.makeFlag(TimeUnitRange.MONTH)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDateCeilExpression);
+
+ // FLOOR
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
+ Arrays.asList(
+ rexBuilder.makeDateLiteral(calendar),
+ rexBuilder.makeFlag(TimeUnitRange.MONTH)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDateFloorExpression);
+
+ // EXTRACT == EXTRACT_DATE?
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
+ Arrays.asList(
+ rexBuilder.makeFlag(TimeUnitRange.MONTH),
+ rexBuilder.makeDateLiteral(calendar)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlExtractExpression);
+
+ // CURRENT_DATE
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentDateExpression);
+
+ // LOCALTIME
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
+
+ // LOCALTIMESTAMP
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
new file mode 100644
index 0000000..c6478a6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.planner.BeamRuleSets;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}.
+ */
+public class BeamSqlFnExecutorTestBase {
+ public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+ public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder);
+
+ public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ public static RelDataType relDataType;
+
+ public static BeamSqlRowType beamRowType;
+ public static BeamSqlRow record;
+
+ public static RelBuilder relBuilder;
+
+ @BeforeClass
+ public static void prepare() {
+ relDataType = TYPE_FACTORY.builder()
+ .add("order_id", SqlTypeName.BIGINT)
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE)
+ .add("order_time", SqlTypeName.BIGINT).build();
+
+ beamRowType = CalciteUtils.toBeamRowType(relDataType);
+ record = new BeamSqlRow(beamRowType);
+
+ record.addField(0, 1234567L);
+ record.addField(1, 0);
+ record.addField(2, 8.9);
+ record.addField(3, 1234567L);
+
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+ .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+ .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+ relBuilder = RelBuilder.create(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
new file mode 100644
index 0000000..7bfbe20
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlIsNullExpression} and
+ * {@link BeamSqlIsNotNullExpression}.
+ */
+public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testIsNull() {
+ BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
+ new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+ Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+ BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+ Assert.assertEquals(true, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testIsNotNull() {
+ BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
+ new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+ Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+ BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+ Assert.assertEquals(false, exp2.evaluate(record).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
new file mode 100644
index 0000000..b6f65a1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
+ */
+public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testAnd() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+ Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+ Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testOr() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+ Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+ Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
new file mode 100644
index 0000000..28ed920
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCaseExpression.
+ */
+public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test public void accept() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertTrue(new BeamSqlCaseExpression(operands).accept());
+
+ // even param count
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ // `when` type error
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ // `then` type mixing
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ }
+
+ @Test public void evaluate() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("hello", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("world", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("hello1", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..feefc45
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ private List<BeamSqlExpression> operands;
+
+ @Before
+ public void setup() {
+ operands = new ArrayList<>();
+ }
+
+ @Test
+ public void testForOperands() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+ Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+ }
+
+ @Test
+ public void testForIntegerToBigintTypeCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForDoubleToBigIntCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForIntegerToDateCast() {
+ // test for yyyyMMdd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyyyMMddDateFormat() {
+ //test for yyyy-MM-dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyMMddDateFormat() {
+ // test for yy.MM.dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testForTimestampCastExpression() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+ Assert.assertEquals(SqlTypeName.TIMESTAMP,
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+ .getOutputType());
+ }
+
+ @Test
+ public void testDateTimeFormatWithMillis() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormatWithTimezone() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormat() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testForCastTypeNotSupported() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+}