You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:11 UTC
[48/59] beam git commit: move all implementation classes/packages
into impl package
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
new file mode 100644
index 0000000..f350087
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSqlFnExecutor}.
+ */
+public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testBeamFilterRel() {
+ RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+ rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+ rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+ rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+ BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null,
+ condition);
+
+ BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
+ executor.prepare();
+
+ Assert.assertEquals(1, executor.exps.size());
+
+ BeamSqlExpression l1Exp = executor.exps.get(0);
+ assertTrue(l1Exp instanceof BeamSqlAndExpression);
+ Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+ Assert.assertEquals(2, l1Exp.getOperands().size());
+ BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+ BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+ assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
+ assertTrue(l1Right instanceof BeamSqlEqualsExpression);
+
+ Assert.assertEquals(2, l1Left.getOperands().size());
+ BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);
+ BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1);
+ assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+ assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+ Assert.assertEquals(2, l1Right.getOperands().size());
+ BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0);
+ BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1);
+ assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+ assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+ }
+
+ @Test
+ public void testBeamProjectRel() {
+ BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(),
+ relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+ rexBuilder.identityProjects(relDataType), relDataType);
+ BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
+
+ executor.prepare();
+ Assert.assertEquals(4, executor.exps.size());
+ assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+ assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
+ }
+
+
+ @Test
+ public void testBuildExpression_logical() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(false)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlAndExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(false)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOrExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlNotExpression);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_andOr_invalidOperand() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_not_invalidOperand() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testBuildExpression_logical_not_invalidOperandCount() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral(true)
+ )
+ );
+ BeamSqlFnExecutor.buildExpression(rexNode);
+ }
+
+ @Test
+ public void testBuildExpression_arithmetic() {
+ testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class);
+ testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class);
+ }
+
+ private void testBuildArithmeticExpression(SqlOperator fn,
+ Class<? extends BeamSqlExpression> clazz) {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+ rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+ rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+ ));
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+
+ assertTrue(exp.getClass().equals(clazz));
+ }
+
+ @Test
+ public void testBuildExpression_string() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello "),
+ rexBuilder.makeLiteral("world")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlConcatExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello"),
+ rexBuilder.makeLiteral("worldhello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlPositionExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello"),
+ rexBuilder.makeLiteral("worldhello"),
+ rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE))
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlPositionExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlUpperExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+ Arrays.asList(
+ rexBuilder.makeLiteral("hello")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+ Arrays.asList(
+ rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlTrimExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+ Arrays.asList(
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+ Arrays.asList(
+ rexBuilder.makeLiteral(true),
+ rexBuilder.makeLiteral("HELLO"),
+ rexBuilder.makeLiteral("HELLO")
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCaseExpression);
+ }
+
+ @Test
+ public void testBuildExpression_date() {
+ RexNode rexNode;
+ BeamSqlExpression exp;
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
+ calendar.setTime(new Date());
+
+ // CEIL
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
+ Arrays.asList(
+ rexBuilder.makeDateLiteral(calendar),
+ rexBuilder.makeFlag(TimeUnitRange.MONTH)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDateCeilExpression);
+
+ // FLOOR
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
+ Arrays.asList(
+ rexBuilder.makeDateLiteral(calendar),
+ rexBuilder.makeFlag(TimeUnitRange.MONTH)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDateFloorExpression);
+
+ // EXTRACT == EXTRACT_DATE?
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
+ Arrays.asList(
+ rexBuilder.makeFlag(TimeUnitRange.MONTH),
+ rexBuilder.makeDateLiteral(calendar)
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlExtractExpression);
+
+ // CURRENT_DATE
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentDateExpression);
+
+ // LOCALTIME
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
+
+ // LOCALTIMESTAMP
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
+ Arrays.<RexNode>asList(
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
new file mode 100644
index 0000000..388c556
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}.
+ */
+public class BeamSqlFnExecutorTestBase {
+ public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+ public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder);
+
+ public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ public static RelDataType relDataType;
+
+ public static BeamSqlRowType beamRowType;
+ public static BeamSqlRow record;
+
+ public static RelBuilder relBuilder;
+
+ @BeforeClass
+ public static void prepare() {
+ relDataType = TYPE_FACTORY.builder()
+ .add("order_id", SqlTypeName.BIGINT)
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE)
+ .add("order_time", SqlTypeName.BIGINT).build();
+
+ beamRowType = CalciteUtils.toBeamRowType(relDataType);
+ record = new BeamSqlRow(beamRowType);
+
+ record.addField(0, 1234567L);
+ record.addField(1, 0);
+ record.addField(2, 8.9);
+ record.addField(3, 1234567L);
+
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+ .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+ .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+ relBuilder = RelBuilder.create(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
new file mode 100644
index 0000000..5278871
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlIsNullExpression} and
+ * {@link BeamSqlIsNotNullExpression}.
+ */
+public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testIsNull() {
+ BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
+ new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+ Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+ BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+ Assert.assertEquals(true, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testIsNotNull() {
+ BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
+ new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+ Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+ BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+ Assert.assertEquals(false, exp2.evaluate(record).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
new file mode 100644
index 0000000..f6e33b5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
+ */
+public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testAnd() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+ Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+ Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testOr() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+ Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+ Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
new file mode 100644
index 0000000..068f041
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCaseExpression.
+ */
+public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test public void accept() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertTrue(new BeamSqlCaseExpression(operands).accept());
+
+ // even param count
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ // `when` type error
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ // `then` type mixing
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ assertFalse(new BeamSqlCaseExpression(operands).accept());
+
+ }
+
+ @Test public void evaluate() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("hello", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("world", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+ assertEquals("hello1", new BeamSqlCaseExpression(operands)
+ .evaluate(record).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..0c0aaa5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ private List<BeamSqlExpression> operands;
+
+ @Before
+ public void setup() {
+ operands = new ArrayList<>();
+ }
+
+ @Test
+ public void testForOperands() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+ Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+ }
+
+ @Test
+ public void testForIntegerToBigintTypeCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForDoubleToBigIntCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForIntegerToDateCast() {
+ // test for yyyyMMdd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyyyMMddDateFormat() {
+ //test for yyyy-MM-dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyMMddDateFormat() {
+ // test for yy.MM.dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testForTimestampCastExpression() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+ Assert.assertEquals(SqlTypeName.TIMESTAMP,
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+ .getOutputType());
+ }
+
+ @Test
+ public void testDateTimeFormatWithMillis() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormatWithTimezone() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormat() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testForCastTypeNotSupported() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
new file mode 100644
index 0000000..ae3a12f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlCompareExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for the collections of {@link BeamSqlCompareExpression}.
+ */
+public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testEqual() {
+ BeamSqlEqualsExpression exp1 = new BeamSqlEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
+ Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+ BeamSqlEqualsExpression exp2 = new BeamSqlEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+ Assert.assertEquals(true, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testLargerThan(){
+ BeamSqlGreaterThanExpression exp1 = new BeamSqlGreaterThanExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+ Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+ BeamSqlGreaterThanExpression exp2 = new BeamSqlGreaterThanExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
+ Assert.assertEquals(true, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testLargerThanEqual(){
+ BeamSqlGreaterThanOrEqualsExpression exp1 = new BeamSqlGreaterThanOrEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+ Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+ BeamSqlGreaterThanOrEqualsExpression exp2 = new BeamSqlGreaterThanOrEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
+ Assert.assertEquals(false, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testLessThan(){
+ BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)));
+ Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+ BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)));
+ Assert.assertEquals(false, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testLessThanEqual(){
+ BeamSqlLessThanOrEqualsExpression exp1 = new BeamSqlLessThanOrEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+ BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
+ Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+ BeamSqlLessThanOrEqualsExpression exp2 = new BeamSqlLessThanOrEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+ BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
+ Assert.assertEquals(false, exp2.evaluate(record).getValue());
+ }
+
+ @Test
+ public void testNotEqual(){
+ BeamSqlNotEqualsExpression exp1 = new BeamSqlNotEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+ Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+ BeamSqlNotEqualsExpression exp2 = new BeamSqlNotEqualsExpression(
+ Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+ BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
+ Assert.assertEquals(true, exp2.evaluate(record).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
new file mode 100644
index 0000000..c78f9c0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlInputRefExpression}.
+ */
+public class BeamSqlInputRefExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testRefInRange() {
+ BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
+ Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue());
+
+ BeamSqlInputRefExpression ref1 = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1);
+ Assert.assertEquals(record.getInteger(1), ref1.evaluate(record).getValue());
+
+ BeamSqlInputRefExpression ref2 = new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2);
+ Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue());
+
+ BeamSqlInputRefExpression ref3 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3);
+ Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue());
+ }
+
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testRefOutOfRange(){
+ BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4);
+ ref.evaluate(record).getValue();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTypeUnMatch(){
+ BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0);
+ ref.evaluate(record).getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
new file mode 100644
index 0000000..c4e3d3f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlPrimitive}.
+ *
+ */
+public class BeamSqlPrimitiveTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testPrimitiveInt(){
+ BeamSqlPrimitive<Integer> expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100);
+ Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPrimitiveTypeUnMatch1(){
+ BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L);
+ Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+ }
+ @Test(expected = IllegalArgumentException.class)
+ public void testPrimitiveTypeUnMatch2(){
+ BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L);
+ Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+ }
+ @Test(expected = IllegalArgumentException.class)
+ public void testPrimitiveTypeUnMatch3(){
+ BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L);
+ Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+ }
+ @Test(expected = IllegalArgumentException.class)
+ public void testPrimitiveTypeUnMatch4(){
+ BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L);
+ Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
new file mode 100644
index 0000000..2e01737
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlReinterpretExpression}.
+ */
+public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test public void accept() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, new Date()));
+ assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date()));
+ assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+ operands.clear();
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date());
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+ assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+ // currently only support reinterpret DATE
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+ assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+
+ // currently only support convert to BIGINT
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+ assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.TINYINT).accept());
+ }
+
+ @Test public void evaluate() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ Date d = new Date();
+ d.setTime(1000);
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
+ assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
+ .evaluate(record).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
new file mode 100644
index 0000000..c4732f5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlUdfExpression.
+ */
+public class BeamSqlUdfExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void testUdf() throws NoSuchMethodException, SecurityException {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+
+ BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
+ UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
+
+ Assert.assertEquals(-10, exp.evaluate(record).getValue());
+ }
+
+ /**
+ * UDF example.
+ */
+ public static final class UdfFn {
+ public static int negative(Integer number) {
+ return number == null ? 0 : 0 - number;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
new file mode 100644
index 0000000..44001f9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamSqlArithmeticExpression}.
+ */
+public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ @Test public void testAccept_normal() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // byte, short
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+ assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+ // integer, long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+ // float, double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+ assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+ // varchar
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "1"));
+ assertFalse(new BeamSqlPlusExpression(operands).accept());
+ }
+
+ @Test public void testAccept_exception() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // more than 2 operands
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+ assertFalse(new BeamSqlPlusExpression(operands).accept());
+
+ // boolean
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+ assertFalse(new BeamSqlPlusExpression(operands).accept());
+ }
+
+ @Test public void testPlus() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // integer + integer => integer
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+ // integer + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+ // long + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+ // float + long => float
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(Float.valueOf(1.1F + 1),
+ new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+ // double + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test public void testMinus() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // integer + integer => long
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+ // integer + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+ // long + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+ // float + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2.1F - 1L,
+ new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
+
+ // double + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test public void testMultiply() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // integer + integer => integer
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+ // integer + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+ // long + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+ // float + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(Float.valueOf(2.1F * 1L),
+ new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+ // double + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test public void testDivide() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // integer + integer => integer
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+ // integer + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+ // long + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+ // float + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2.1F / 1,
+ new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+ // double + long => double
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+ assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+ }
+
+ @Test public void testMod() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+
+ // integer + integer => long
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+ assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
+
+ // integer + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+ assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+
+ // long + long => long
+ operands.clear();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+ assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
new file mode 100644
index 0000000..cd390c4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCurrentDateExpression.
+ */
+public class BeamSqlCurrentDateExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Test
+ public void test() {
+ Assert.assertEquals(
+ SqlTypeName.DATE,
+ new BeamSqlCurrentDateExpression()
+ .evaluate(BeamSqlFnExecutorTestBase.record).getOutputType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
new file mode 100644
index 0000000..416df01
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimeExpression.
+ */
+public class BeamSqlCurrentTimeExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Test
+ public void test() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ assertEquals(SqlTypeName.TIME,
+ new BeamSqlCurrentTimeExpression(operands).evaluate(record).getOutputType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
new file mode 100644
index 0000000..d44b6c1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimestampExpression.
+ */
+public class BeamSqlCurrentTimestampExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Test
+ public void test() {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ assertEquals(SqlTypeName.TIMESTAMP,
+ new BeamSqlCurrentTimestampExpression(operands).evaluate(record).getOutputType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
new file mode 100644
index 0000000..5bc99e8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateCeilExpression}.
+ */
+public class BeamSqlDateCeilExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Test public void evaluate() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+ str2DateTime("2017-05-22 09:10:11")));
+ // YEAR
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+ Assert.assertEquals(str2DateTime("2018-01-01 00:00:00"),
+ new BeamSqlDateCeilExpression(operands)
+ .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+
+ operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+ Assert.assertEquals(str2DateTime("2017-06-01 00:00:00"),
+ new BeamSqlDateCeilExpression(operands)
+ .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
new file mode 100644
index 0000000..0e57404
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+
+/**
+ * Base class for all date related expression test.
+ */
+public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
+ protected long str2LongTime(String dateStr) {
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ try {
+ Date date = format.parse(dateStr);
+ return date.getTime();
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Date str2DateTime(String dateStr) {
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ try {
+ format.setTimeZone(TimeZone.getTimeZone("GMT"));
+ Date date = format.parse(dateStr);
+ return date;
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
new file mode 100644
index 0000000..ecab54b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateFloorExpression}.
+ */
+public class BeamSqlDateFloorExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Test public void evaluate() throws Exception {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+ str2DateTime("2017-05-22 09:10:11")));
+ // YEAR
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+ assertEquals(str2DateTime("2017-01-01 00:00:00"),
+ new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+ // MONTH
+ operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+ assertEquals(str2DateTime("2017-05-01 00:00:00"),
+ new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+
+ }
+}