You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/07 16:54:28 UTC
[12/50] [abbrv] beam git commit: [BEAM-2203] Implement TIMESTAMPADD
[BEAM-2203] Implement TIMESTAMPADD
Add support for TIMESTAMPADD(interval, multiplier, TIMESTAMP)
fixup! [BEAM-2203] Implement TIMESTAMPADD
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/820f8aff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/820f8aff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/820f8aff
Branch: refs/heads/mr-runner
Commit: 820f8aff944d1eda69c9226086848d8b39fcf62f
Parents: a33c717
Author: Anton Kedin <ke...@kedin-macbookpro.roam.corp.google.com>
Authored: Fri Oct 27 10:15:08 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Wed Nov 1 16:29:07 2017 -0700
----------------------------------------------------------------------
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 19 ++-
.../interpreter/operator/BeamSqlPrimitive.java | 8 +-
.../date/BeamSqlDatetimePlusExpression.java | 129 +++++++++++++++
.../date/BeamSqlIntervalMultiplyExpression.java | 103 ++++++++++++
.../operator/date/TimeUnitUtils.java | 54 +++++++
.../extensions/sql/impl/utils/SqlTypeUtils.java | 46 ++++++
.../impl/interpreter/BeamSqlFnExecutorTest.java | 30 ++++
.../date/BeamSqlDateExpressionTestBase.java | 5 +-
.../date/BeamSqlDatetimePlusExpressionTest.java | 155 +++++++++++++++++++
.../BeamSqlIntervalMultiplyExpressionTest.java | 107 +++++++++++++
.../operator/date/TimeUnitUtilsTest.java | 54 +++++++
.../sql/impl/utils/SqlTypeUtilsTest.java | 76 +++++++++
.../BeamSqlDateFunctionsIntegrationTest.java | 39 ++++-
13 files changed, 818 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 8f9797b..8770055 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
+
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
@@ -49,7 +50,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
@@ -143,7 +146,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
// NlsString is not serializable, we need to convert
// it to string explicitly.
return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
- } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+ } else if (isDateNode(type, value)) {
// does this actually make sense?
// Calcite actually treat Calendar as the java type of Date Literal
return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
@@ -235,7 +238,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
ret = new BeamSqlMinusExpression(subExps);
break;
case "*":
- ret = new BeamSqlMultiplyExpression(subExps);
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ ret = new BeamSqlMultiplyExpression(subExps);
+ } else {
+ ret = new BeamSqlIntervalMultiplyExpression(subExps);
+ }
break;
case "/":
case "/INT":
@@ -369,6 +376,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
case "CURRENT_DATE":
return new BeamSqlCurrentDateExpression();
+ case "DATETIME_PLUS":
+ return new BeamSqlDatetimePlusExpression(subExps);
+
case "CASE":
ret = new BeamSqlCaseExpression(subExps);
@@ -423,6 +433,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
return ret;
}
+ private static boolean isDateNode(SqlTypeName type, Object value) {
+ return (type == SqlTypeName.DATE || type == SqlTypeName.TIMESTAMP)
+ && value instanceof Calendar;
+ }
+
@Override
public void prepare() {
}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 9175caa..21cbc80 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -133,9 +134,12 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
case TIMESTAMP:
case DATE:
return value instanceof Date;
- case INTERVAL_HOUR:
- return value instanceof BigDecimal;
+ case INTERVAL_SECOND:
case INTERVAL_MINUTE:
+ case INTERVAL_HOUR:
+ case INTERVAL_DAY:
+ case INTERVAL_MONTH:
+ case INTERVAL_YEAR:
return value instanceof BigDecimal;
case SYMBOL:
// for SYMBOL, it supports anything...
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
new file mode 100644
index 0000000..426cda0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+
+/**
+ * DATETIME_PLUS operation.
+ * Calcite converts 'TIMESTAMPADD(..)' or 'DATE + INTERVAL' from the user input
+ * into DATETIME_PLUS.
+ *
+ * <p>Input and output are expected to be of type TIMESTAMP.
+ */
+public class BeamSqlDatetimePlusExpression extends BeamSqlExpression {
+
+ private static final Set<SqlTypeName> SUPPORTED_INTERVAL_TYPES = ImmutableSet.of(
+ SqlTypeName.INTERVAL_SECOND,
+ SqlTypeName.INTERVAL_MINUTE,
+ SqlTypeName.INTERVAL_HOUR,
+ SqlTypeName.INTERVAL_DAY,
+ SqlTypeName.INTERVAL_MONTH,
+ SqlTypeName.INTERVAL_YEAR);
+
+ public BeamSqlDatetimePlusExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.TIMESTAMP);
+ }
+
+ /**
+ * Requires exactly 2 operands. One should be a timestamp, another an interval
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 2
+ && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+ && SUPPORTED_INTERVAL_TYPES.contains(operands.get(1).getOutputType());
+ }
+
+ /**
+ * Adds interval to the timestamp.
+ *
+ * <p>Interval has a value of 'multiplier * TimeUnit.multiplier'.
+ *
+ * <p>For example, '3 years' is going to have a type of INTERVAL_YEAR, and a value of 36.
+ * And '2 minutes' is going to be an INTERVAL_MINUTE with a value of 120000. This is the way
+ * Calcite handles interval expressions, and {@link BeamSqlIntervalMultiplyExpression} also works
+ * the same way.
+ */
+ @Override
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ DateTime timestamp = getTimestampOperand(inputRow, window);
+ BeamSqlPrimitive intervalOperandPrimitive = getIntervalOperand(inputRow, window);
+ SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType();
+ int intervalMultiplier = getIntervalMultiplier(intervalOperandPrimitive);
+
+ DateTime newDate = addInterval(timestamp, intervalOperandType, intervalMultiplier);
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, newDate.toDate());
+ }
+
+ private int getIntervalMultiplier(BeamSqlPrimitive intervalOperandPrimitive) {
+ BigDecimal intervalOperandValue = intervalOperandPrimitive.getDecimal();
+ BigDecimal multiplier = intervalOperandValue.divide(
+ timeUnitInternalMultiplier(intervalOperandPrimitive.getOutputType()),
+ BigDecimal.ROUND_CEILING);
+ return multiplier.intValueExact();
+ }
+
+ private BeamSqlPrimitive getIntervalOperand(BeamRecord inputRow, BoundedWindow window) {
+ return findExpressionOfType(operands, SUPPORTED_INTERVAL_TYPES).get()
+ .evaluate(inputRow, window);
+ }
+
+ private DateTime getTimestampOperand(BeamRecord inputRow, BoundedWindow window) {
+ BeamSqlPrimitive timestampOperandPrimitive =
+ findExpressionOfType(operands, SqlTypeName.TIMESTAMP).get().evaluate(inputRow, window);
+ return new DateTime(timestampOperandPrimitive.getDate());
+ }
+
+ private DateTime addInterval(
+ DateTime dateTime, SqlTypeName intervalType, int numberOfIntervals) {
+
+ switch (intervalType) {
+ case INTERVAL_SECOND:
+ return dateTime.plusSeconds(numberOfIntervals);
+ case INTERVAL_MINUTE:
+ return dateTime.plusMinutes(numberOfIntervals);
+ case INTERVAL_HOUR:
+ return dateTime.plusHours(numberOfIntervals);
+ case INTERVAL_DAY:
+ return dateTime.plusDays(numberOfIntervals);
+ case INTERVAL_MONTH:
+ return dateTime.plusMonths(numberOfIntervals);
+ case INTERVAL_YEAR:
+ return dateTime.plusYears(numberOfIntervals);
+ default:
+ throw new IllegalArgumentException("Adding "
+ + intervalType.getName() + " to date is not supported");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
new file mode 100644
index 0000000..f4ddf71
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+
+import com.google.common.base.Optional;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Multiplication operator for intervals.
+ * For example, allows to express things like '3 years'.
+ *
+ * <p>One use case of this is implementation of TIMESTAMPADD().
+ * Calcite converts TIMESTAMPADD(date, multiplier, inteval) into
+ * DATETIME_PLUS(date, multiplier * interval).
+ * The 'multiplier * interval' part is what this class implements. It's not a regular
+ * numerical multiplication because the return type is expected to be an interval, and the value
+ * is expected to use corresponding TimeUnit's internal value (e.g. 12 for YEAR, 60000 for MINUTE).
+ */
+public class BeamSqlIntervalMultiplyExpression extends BeamSqlExpression {
+ public BeamSqlIntervalMultiplyExpression(List<BeamSqlExpression> operands) {
+ super(operands, deduceOutputType(operands));
+ }
+
+ /**
+ * Output type is null if no operands found with matching types.
+ * Execution will later fail when calling accept()
+ */
+ private static SqlTypeName deduceOutputType(List<BeamSqlExpression> operands) {
+ Optional<BeamSqlExpression> intervalOperand =
+ findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES);
+
+ return intervalOperand.isPresent()
+ ? intervalOperand.get().getOutputType()
+ : null;
+ }
+
+ /**
+ * Requires exactly 2 operands. One should be integer, another should be interval
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 2
+ && findExpressionOfType(operands, SqlTypeName.INTEGER).isPresent()
+ && findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).isPresent();
+ }
+ /**
+ * Evaluates the number of times the interval should be repeated, times the TimeUnit multiplier.
+ * For example for '3 * MONTH' this will return an object with type INTERVAL_MONTH and value 36.
+ *
+ * <p>This is due to the fact that TimeUnit has different internal multipliers for each interval,
+ * e.g. MONTH is 12, but MINUTE is 60000. When Calcite parses SQL interval literals, it returns
+ * those internal multipliers. This means we need to do similar thing, so that this multiplication
+ * expression behaves the same way as literal interval expression.
+ *
+ * <p>That is, we need to make sure that this:
+ * "TIMESTAMP '1984-04-19 01:02:03' + INTERVAL '2' YEAR"
+ * is equivalent tot this:
+ * "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1984-04-19 01:02:03')"
+ */
+ @Override
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ BeamSqlPrimitive intervalOperandPrimitive =
+ findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).get().evaluate(inputRow, window);
+ SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType();
+
+ BeamSqlPrimitive integerOperandPrimitive =
+ findExpressionOfType(operands, SqlTypeName.INTEGER).get().evaluate(inputRow, window);
+ BigDecimal integerOperandValue = new BigDecimal(integerOperandPrimitive.getInteger());
+
+ BigDecimal multiplicationResult =
+ integerOperandValue.multiply(
+ timeUnitInternalMultiplier(intervalOperandType));
+
+ return BeamSqlPrimitive.of(outputType, multiplicationResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
new file mode 100644
index 0000000..b432d20
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import java.math.BigDecimal;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utils to convert between Calcite's TimeUnit and Sql intervals.
+ */
+public abstract class TimeUnitUtils {
+
+ /**
+ * @return internal multiplier of a TimeUnit, e.g. YEAR is 12, MINUTE is 60000
+ * @throws IllegalArgumentException if interval type is not supported
+ */
+ public static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) {
+ switch (sqlIntervalType) {
+ case INTERVAL_SECOND:
+ return TimeUnit.SECOND.multiplier;
+ case INTERVAL_MINUTE:
+ return TimeUnit.MINUTE.multiplier;
+ case INTERVAL_HOUR:
+ return TimeUnit.HOUR.multiplier;
+ case INTERVAL_DAY:
+ return TimeUnit.DAY.multiplier;
+ case INTERVAL_MONTH:
+ return TimeUnit.MONTH.multiplier;
+ case INTERVAL_YEAR:
+ return TimeUnit.YEAR.multiplier;
+ default:
+ throw new IllegalArgumentException("Interval " + sqlIntervalType
+ + " cannot be converted to TimeUnit");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
new file mode 100644
index 0000000..1ab703e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
@@ -0,0 +1,46 @@
+package org.apache.beam.sdk.extensions.sql.impl.utils;
+
+import com.google.common.base.Optional;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utils to help with SqlTypes.
+ */
+public class SqlTypeUtils {
+ /**
+ * Finds an operand with provided type.
+ * Returns Optional.absent() if no operand found with matching type
+ */
+ public static Optional<BeamSqlExpression> findExpressionOfType(
+ List<BeamSqlExpression> operands, SqlTypeName type) {
+
+ for (BeamSqlExpression operand : operands) {
+ if (type.equals(operand.getOutputType())) {
+ return Optional.of(operand);
+ }
+ }
+
+ return Optional.absent();
+ }
+
+ /**
+ * Finds an operand with the type in typesToFind.
+ * Returns Optional.absent() if no operand found with matching type
+ */
+ public static Optional<BeamSqlExpression> findExpressionOfType(
+ List<BeamSqlExpression> operands, Collection<SqlTypeName> typesToFind) {
+
+ for (BeamSqlExpression operand : operands) {
+ if (typesToFind.contains(operand.getOutputType())) {
+ return Optional.of(operand);
+ }
+ }
+
+ return Optional.absent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index f350087..c4583ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
+
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
@@ -40,7 +41,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
@@ -57,12 +60,15 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Assert;
import org.junit.Test;
@@ -412,5 +418,29 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
);
exp = BeamSqlFnExecutor.buildExpression(rexNode);
assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+
+ // DATETIME_PLUS
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.DATETIME_PLUS,
+ Arrays.<RexNode>asList(
+ rexBuilder.makeDateLiteral(calendar),
+ rexBuilder.makeIntervalLiteral(
+ new BigDecimal(10),
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO))
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDatetimePlusExpression);
+
+ // * for intervals
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY,
+ Arrays.<RexNode>asList(
+ rexBuilder.makeExactLiteral(new BigDecimal(1)),
+ rexBuilder.makeIntervalLiteral(
+ new BigDecimal(10),
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO))
+ )
+ );
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlIntervalMultiplyExpression);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
index 0e57404..cb0b6ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
@@ -22,13 +22,14 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
+
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
/**
* Base class for all date related expression test.
*/
public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
- protected long str2LongTime(String dateStr) {
+ static long str2LongTime(String dateStr) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date date = format.parse(dateStr);
@@ -38,7 +39,7 @@ public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
}
}
- protected Date str2DateTime(String dateStr) {
+ static Date str2DateTime(String dateStr) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
format.setTimeZone(TimeZone.getTimeZone("GMT"));
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
new file mode 100644
index 0000000..57e709f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link BeamSqlDatetimePlusExpression}.
+ */
+public class BeamSqlDatetimePlusExpressionTest extends BeamSqlDateExpressionTestBase {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final BeamRecord NULL_INPUT_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+ private static final Date DATE = str2DateTime("1984-04-19 01:02:03");
+
+ private static final Date DATE_PLUS_15_SECONDS = new DateTime(DATE).plusSeconds(15).toDate();
+ private static final Date DATE_PLUS_10_MINUTES = new DateTime(DATE).plusMinutes(10).toDate();
+ private static final Date DATE_PLUS_7_HOURS = new DateTime(DATE).plusHours(7).toDate();
+ private static final Date DATE_PLUS_3_DAYS = new DateTime(DATE).plusDays(3).toDate();
+ private static final Date DATE_PLUS_2_MONTHS = new DateTime(DATE).plusMonths(2).toDate();
+ private static final Date DATE_PLUS_11_YEARS = new DateTime(DATE).plusYears(11).toDate();
+
+ private static final BeamSqlExpression SQL_INTERVAL_15_SECONDS =
+ interval(SqlTypeName.INTERVAL_SECOND, 15);
+ private static final BeamSqlExpression SQL_INTERVAL_10_MINUTES =
+ interval(SqlTypeName.INTERVAL_MINUTE, 10);
+ private static final BeamSqlExpression SQL_INTERVAL_7_HOURS =
+ interval(SqlTypeName.INTERVAL_HOUR, 7);
+ private static final BeamSqlExpression SQL_INTERVAL_3_DAYS =
+ interval(SqlTypeName.INTERVAL_DAY, 3);
+ private static final BeamSqlExpression SQL_INTERVAL_2_MONTHS =
+ interval(SqlTypeName.INTERVAL_MONTH, 2);
+ private static final BeamSqlExpression SQL_INTERVAL_4_MONTHS =
+ interval(SqlTypeName.INTERVAL_MONTH, 4);
+ private static final BeamSqlExpression SQL_INTERVAL_11_YEARS =
+ interval(SqlTypeName.INTERVAL_YEAR, 11);
+
+ private static final BeamSqlExpression SQL_TIMESTAMP =
+ BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, DATE);
+
+ @Test public void testHappyPath_outputTypeAndAccept() {
+ BeamSqlExpression plusExpression = dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS);
+
+ assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+ assertTrue(plusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptTreeOperands() {
+ BeamSqlDatetimePlusExpression plusExpression =
+ dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS);
+
+ assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+ assertFalse(plusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptWithoutTimestampOperand() {
+ BeamSqlDatetimePlusExpression plusExpression =
+ dateTimePlus(SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS);
+
+ assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+ assertFalse(plusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptWithoutIntervalOperand() {
+ BeamSqlDatetimePlusExpression plusExpression =
+ dateTimePlus(SQL_TIMESTAMP, SQL_TIMESTAMP);
+
+ assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+ assertFalse(plusExpression.accept());
+ }
+
+ @Test public void testEvaluate() {
+ assertEquals(DATE_PLUS_15_SECONDS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_15_SECONDS));
+ assertEquals(DATE_PLUS_10_MINUTES, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_10_MINUTES));
+ assertEquals(DATE_PLUS_7_HOURS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_7_HOURS));
+ assertEquals(DATE_PLUS_3_DAYS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS));
+ assertEquals(DATE_PLUS_2_MONTHS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_2_MONTHS));
+ assertEquals(DATE_PLUS_11_YEARS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_11_YEARS));
+ }
+
+ @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+ thrown.expect(UnsupportedOperationException.class);
+
+ BeamSqlPrimitive unsupportedInterval = BeamSqlPrimitive.of(SqlTypeName.INTERVAL_YEAR_MONTH, 3);
+ evalDatetimePlus(SQL_TIMESTAMP, unsupportedInterval);
+ }
+
+ private static Date evalDatetimePlus(BeamSqlExpression date, BeamSqlExpression interval) {
+ return dateTimePlus(date, interval).evaluate(NULL_INPUT_ROW, NULL_WINDOW).getDate();
+ }
+
+ private static BeamSqlDatetimePlusExpression dateTimePlus(BeamSqlExpression ... operands) {
+ return new BeamSqlDatetimePlusExpression(Arrays.asList(operands));
+ }
+
+ private static BeamSqlExpression interval(SqlTypeName type, int multiplier) {
+ return BeamSqlPrimitive.of(type,
+ timeUnitInternalMultiplier(type)
+ .multiply(new BigDecimal(multiplier)));
+ }
+
+ private static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) {
+ switch (sqlIntervalType) {
+ case INTERVAL_SECOND:
+ return TimeUnit.SECOND.multiplier;
+ case INTERVAL_MINUTE:
+ return TimeUnit.MINUTE.multiplier;
+ case INTERVAL_HOUR:
+ return TimeUnit.HOUR.multiplier;
+ case INTERVAL_DAY:
+ return TimeUnit.DAY.multiplier;
+ case INTERVAL_MONTH:
+ return TimeUnit.MONTH.multiplier;
+ case INTERVAL_YEAR:
+ return TimeUnit.YEAR.multiplier;
+ default:
+ throw new IllegalArgumentException("Interval " + sqlIntervalType
+ + " cannot be converted to TimeUnit");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
new file mode 100644
index 0000000..0c91f40
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlIntervalMultiplyExpression.
+ */
+public class BeamSqlIntervalMultiplyExpressionTest {
+ private static final BeamRecord NULL_INPUT_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+ private static final BigDecimal DECIMAL_THREE = new BigDecimal(3);
+ private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4);
+
+ private static final BeamSqlExpression SQL_INTERVAL_DAY =
+ BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE);
+
+ private static final BeamSqlExpression SQL_INTERVAL_MONTH =
+ BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR);
+
+ private static final BeamSqlExpression SQL_INTEGER_FOUR =
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4);
+
+ private static final BeamSqlExpression SQL_INTEGER_FIVE =
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5);
+
+ @Test public void testHappyPath_outputTypeAndAccept() {
+ BeamSqlExpression multiplyExpression =
+ newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR);
+
+ assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+ assertTrue(multiplyExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptTreeOperands() {
+ BeamSqlIntervalMultiplyExpression multiplyExpression =
+ newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FIVE, SQL_INTEGER_FOUR);
+
+ assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+ assertFalse(multiplyExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptWithoutIntervalOperand() {
+ BeamSqlIntervalMultiplyExpression multiplyExpression =
+ newMultiplyExpression(SQL_INTEGER_FOUR, SQL_INTEGER_FIVE);
+
+ assertNull(multiplyExpression.getOutputType());
+ assertFalse(multiplyExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptWithoutIntegerOperand() {
+ BeamSqlIntervalMultiplyExpression multiplyExpression =
+ newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTERVAL_MONTH);
+
+ assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+ assertFalse(multiplyExpression.accept());
+ }
+
+ @Test public void testEvaluate_integerOperand() {
+ BeamSqlIntervalMultiplyExpression multiplyExpression =
+ newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR);
+
+ BeamSqlPrimitive multiplicationResult =
+ multiplyExpression.evaluate(NULL_INPUT_ROW, NULL_WINDOW);
+
+ BigDecimal expectedResult =
+ DECIMAL_FOUR.multiply(timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY));
+
+ assertEquals(expectedResult, multiplicationResult.getDecimal());
+ assertEquals(SqlTypeName.INTERVAL_DAY, multiplicationResult.getOutputType());
+ }
+
+ private BeamSqlIntervalMultiplyExpression newMultiplyExpression(BeamSqlExpression ... operands) {
+ return new BeamSqlIntervalMultiplyExpression(Arrays.asList(operands));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
new file mode 100644
index 0000000..91552ae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
@@ -0,0 +1,54 @@
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit tests for {@link TimeUnitUtils}.
+ */
+public class TimeUnitUtilsTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testReturnsInternalTimeUnitMultipliers() {
+ assertEquals(TimeUnit.SECOND.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_SECOND));
+ assertEquals(TimeUnit.MINUTE.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MINUTE));
+ assertEquals(TimeUnit.HOUR.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_HOUR));
+ assertEquals(TimeUnit.DAY.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY));
+ assertEquals(TimeUnit.MONTH.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MONTH));
+ assertEquals(TimeUnit.YEAR.multiplier,
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_YEAR));
+ }
+
+ @Test public void testThrowsForUnsupportedIntervalType() {
+ thrown.expect(IllegalArgumentException.class);
+ TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY_MINUTE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
new file mode 100644
index 0000000..1a14256
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.utils;
+
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for {@link SqlTypeUtils}.
+ */
+public class SqlTypeUtilsTest {
+ private static final BigDecimal DECIMAL_THREE = new BigDecimal(3);
+ private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4);
+
+ private static final List<BeamSqlExpression> EXPRESSIONS = Arrays.<BeamSqlExpression> asList(
+ BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE),
+ BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+
+ @Test public void testFindExpressionOfType_success() {
+ Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INTEGER);
+
+ assertTrue(typeName.isPresent());
+ assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType());
+ }
+
+ @Test public void testFindExpressionOfType_failure() {
+ Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.VARCHAR);
+
+ assertFalse(typeName.isPresent());
+ }
+
+ @Test public void testFindExpressionOfTypes_success() {
+ Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INT_TYPES);
+
+ assertTrue(typeName.isPresent());
+ assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType());
+ }
+
+ @Test public void testFindExpressionOfTypes_failure() {
+ Optional<BeamSqlExpression> typeName =
+ findExpressionOfType(EXPRESSIONS, SqlTypeName.CHAR_TYPES);
+
+ assertFalse(typeName.isPresent());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 1fdb35f..6937a18 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Date;
import java.util.Iterator;
+
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -35,7 +36,7 @@ import org.junit.Test;
*/
public class BeamSqlDateFunctionsIntegrationTest
extends BeamSqlBuiltinFunctionsIntegrationTestBase {
- @Test public void testDateTimeFunctions() throws Exception {
+ @Test public void testBasicDateTimeFunctions() throws Exception {
ExpressionChecker checker = new ExpressionChecker()
.addExpr("EXTRACT(YEAR FROM ts)", 1986L)
.addExpr("YEAR(ts)", 1986L)
@@ -54,6 +55,42 @@ public class BeamSqlDateFunctionsIntegrationTest
checker.buildRunAndCheck();
}
+ @Test public void testDatetimePlusFunction() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')",
+ parseDate("1984-04-19 01:02:06"))
+ .addExpr("TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')",
+ parseDate("1984-04-19 01:05:03"))
+ .addExpr("TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')",
+ parseDate("1984-04-19 04:02:03"))
+ .addExpr("TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')",
+ parseDate("1984-04-22 01:02:03"))
+ .addExpr("TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')",
+ parseDate("1984-03-19 01:02:03"))
+ .addExpr("TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')",
+ parseDate("1987-01-19 01:02:03"))
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test public void testDatetimeInfixPlus() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND",
+ parseDate("1984-01-19 01:02:06"))
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE",
+ parseDate("1984-01-19 01:04:03"))
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR",
+ parseDate("1984-01-19 03:02:03"))
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY",
+ parseDate("1984-01-21 01:02:03"))
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH",
+ parseDate("1984-03-19 01:02:03"))
+ .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
+ parseDate("1986-01-19 01:02:03"))
+ ;
+ checker.buildRunAndCheck();
+ }
+
@Test public void testDateTimeFunctions_currentTime() throws Exception {
String sql = "SELECT "
+ "LOCALTIME as l,"