You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/07 16:54:28 UTC

[12/50] [abbrv] beam git commit: [BEAM-2203] Implement TIMESTAMPADD

[BEAM-2203] Implement TIMESTAMPADD

Add support for TIMESTAMPADD(interval, multiplier, TIMESTAMP)

fixup! [BEAM-2203] Implement TIMESTAMPADD


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/820f8aff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/820f8aff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/820f8aff

Branch: refs/heads/mr-runner
Commit: 820f8aff944d1eda69c9226086848d8b39fcf62f
Parents: a33c717
Author: Anton Kedin <ke...@kedin-macbookpro.roam.corp.google.com>
Authored: Fri Oct 27 10:15:08 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Wed Nov 1 16:29:07 2017 -0700

----------------------------------------------------------------------
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |  19 ++-
 .../interpreter/operator/BeamSqlPrimitive.java  |   8 +-
 .../date/BeamSqlDatetimePlusExpression.java     | 129 +++++++++++++++
 .../date/BeamSqlIntervalMultiplyExpression.java | 103 ++++++++++++
 .../operator/date/TimeUnitUtils.java            |  54 +++++++
 .../extensions/sql/impl/utils/SqlTypeUtils.java |  46 ++++++
 .../impl/interpreter/BeamSqlFnExecutorTest.java |  30 ++++
 .../date/BeamSqlDateExpressionTestBase.java     |   5 +-
 .../date/BeamSqlDatetimePlusExpressionTest.java | 155 +++++++++++++++++++
 .../BeamSqlIntervalMultiplyExpressionTest.java  | 107 +++++++++++++
 .../operator/date/TimeUnitUtilsTest.java        |  54 +++++++
 .../sql/impl/utils/SqlTypeUtilsTest.java        |  76 +++++++++
 .../BeamSqlDateFunctionsIntegrationTest.java    |  39 ++++-
 13 files changed, 818 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 8f9797b..8770055 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
+
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
@@ -49,7 +50,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
@@ -143,7 +146,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
         // NlsString is not serializable, we need to convert
         // it to string explicitly.
         return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
-      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+      } else if (isDateNode(type, value)) {
         // does this actually make sense?
         // Calcite actually treat Calendar as the java type of Date Literal
         return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
@@ -235,7 +238,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
           ret = new BeamSqlMinusExpression(subExps);
           break;
         case "*":
-          ret = new BeamSqlMultiplyExpression(subExps);
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            ret = new BeamSqlMultiplyExpression(subExps);
+          } else {
+            ret = new BeamSqlIntervalMultiplyExpression(subExps);
+          }
           break;
         case "/":
         case "/INT":
@@ -369,6 +376,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
         case "CURRENT_DATE":
           return new BeamSqlCurrentDateExpression();
 
+        case "DATETIME_PLUS":
+          return new BeamSqlDatetimePlusExpression(subExps);
+
 
         case "CASE":
           ret = new BeamSqlCaseExpression(subExps);
@@ -423,6 +433,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
     return ret;
   }
 
+  private static boolean isDateNode(SqlTypeName type, Object value) {
+    return (type == SqlTypeName.DATE || type == SqlTypeName.TIMESTAMP)
+        && value instanceof Calendar;
+  }
+
   @Override
   public void prepare() {
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 9175caa..21cbc80 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -133,9 +134,12 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
     case TIMESTAMP:
     case DATE:
       return value instanceof Date;
-    case INTERVAL_HOUR:
-      return value instanceof BigDecimal;
+    case INTERVAL_SECOND:
     case INTERVAL_MINUTE:
+    case INTERVAL_HOUR:
+    case INTERVAL_DAY:
+    case INTERVAL_MONTH:
+    case INTERVAL_YEAR:
       return value instanceof BigDecimal;
     case SYMBOL:
       // for SYMBOL, it supports anything...

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
new file mode 100644
index 0000000..426cda0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+
+/**
+ * DATETIME_PLUS operation.
+ * Calcite converts 'TIMESTAMPADD(..)' or 'DATE + INTERVAL' from the user input
+ * into DATETIME_PLUS.
+ *
+ * <p>Input and output are expected to be of type TIMESTAMP.
+ */
+public class BeamSqlDatetimePlusExpression extends BeamSqlExpression {
+
+  private static final Set<SqlTypeName> SUPPORTED_INTERVAL_TYPES = ImmutableSet.of(
+      SqlTypeName.INTERVAL_SECOND,
+      SqlTypeName.INTERVAL_MINUTE,
+      SqlTypeName.INTERVAL_HOUR,
+      SqlTypeName.INTERVAL_DAY,
+      SqlTypeName.INTERVAL_MONTH,
+      SqlTypeName.INTERVAL_YEAR);
+
+  public BeamSqlDatetimePlusExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+
+  /**
+   * Requires exactly 2 operands. One should be a timestamp, another an interval
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2
+        && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+        && SUPPORTED_INTERVAL_TYPES.contains(operands.get(1).getOutputType());
+  }
+
+  /**
+   * Adds interval to the timestamp.
+   *
+   * <p>Interval has a value of 'multiplier * TimeUnit.multiplier'.
+   *
+   * <p>For example, '3 years' is going to have a type of INTERVAL_YEAR, and a value of 36.
+   * And '2 minutes' is going to be an INTERVAL_MINUTE with a value of 120000. This is the way
+   * Calcite handles interval expressions, and {@link BeamSqlIntervalMultiplyExpression} also works
+   * the same way.
+   */
+  @Override
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    DateTime timestamp = getTimestampOperand(inputRow, window);
+    BeamSqlPrimitive intervalOperandPrimitive = getIntervalOperand(inputRow, window);
+    SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType();
+    int intervalMultiplier = getIntervalMultiplier(intervalOperandPrimitive);
+
+    DateTime newDate = addInterval(timestamp, intervalOperandType, intervalMultiplier);
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, newDate.toDate());
+  }
+
+  private int getIntervalMultiplier(BeamSqlPrimitive intervalOperandPrimitive) {
+    BigDecimal intervalOperandValue = intervalOperandPrimitive.getDecimal();
+    BigDecimal multiplier = intervalOperandValue.divide(
+        timeUnitInternalMultiplier(intervalOperandPrimitive.getOutputType()),
+        BigDecimal.ROUND_CEILING);
+    return multiplier.intValueExact();
+  }
+
+  private BeamSqlPrimitive getIntervalOperand(BeamRecord inputRow, BoundedWindow window) {
+    return findExpressionOfType(operands, SUPPORTED_INTERVAL_TYPES).get()
+        .evaluate(inputRow, window);
+  }
+
+  private DateTime getTimestampOperand(BeamRecord inputRow, BoundedWindow window) {
+    BeamSqlPrimitive timestampOperandPrimitive =
+        findExpressionOfType(operands, SqlTypeName.TIMESTAMP).get().evaluate(inputRow, window);
+    return new DateTime(timestampOperandPrimitive.getDate());
+  }
+
+  private DateTime addInterval(
+      DateTime dateTime, SqlTypeName intervalType, int numberOfIntervals) {
+
+    switch (intervalType) {
+      case INTERVAL_SECOND:
+        return dateTime.plusSeconds(numberOfIntervals);
+      case INTERVAL_MINUTE:
+        return dateTime.plusMinutes(numberOfIntervals);
+      case INTERVAL_HOUR:
+        return dateTime.plusHours(numberOfIntervals);
+      case INTERVAL_DAY:
+        return dateTime.plusDays(numberOfIntervals);
+      case INTERVAL_MONTH:
+        return dateTime.plusMonths(numberOfIntervals);
+      case INTERVAL_YEAR:
+        return dateTime.plusYears(numberOfIntervals);
+      default:
+        throw new IllegalArgumentException("Adding "
+            + intervalType.getName() + " to date is not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
new file mode 100644
index 0000000..f4ddf71
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+
+import com.google.common.base.Optional;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Multiplication operator for intervals.
+ * For example, allows to express things like '3 years'.
+ *
+ * <p>One use case of this is implementation of TIMESTAMPADD().
+ * Calcite converts TIMESTAMPADD(date, multiplier, inteval) into
+ * DATETIME_PLUS(date, multiplier * interval).
+ * The 'multiplier * interval' part is what this class implements. It's not a regular
+ * numerical multiplication because the return type is expected to be an interval, and the value
+ * is expected to use corresponding TimeUnit's internal value (e.g. 12 for YEAR, 60000 for MINUTE).
+ */
+public class BeamSqlIntervalMultiplyExpression extends BeamSqlExpression {
+  public BeamSqlIntervalMultiplyExpression(List<BeamSqlExpression> operands) {
+    super(operands, deduceOutputType(operands));
+  }
+
+  /**
+   * Output type is null if no operands found with matching types.
+   * Execution will later fail when calling accept()
+   */
+  private static SqlTypeName deduceOutputType(List<BeamSqlExpression> operands) {
+    Optional<BeamSqlExpression> intervalOperand =
+        findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES);
+
+    return intervalOperand.isPresent()
+        ? intervalOperand.get().getOutputType()
+        : null;
+  }
+
+  /**
+   * Requires exactly 2 operands. One should be integer, another should be interval
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2
+        && findExpressionOfType(operands, SqlTypeName.INTEGER).isPresent()
+        && findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).isPresent();
+  }
+  /**
+   * Evaluates the number of times the interval should be repeated, times the TimeUnit multiplier.
+   * For example for '3 * MONTH' this will return an object with type INTERVAL_MONTH and value 36.
+   *
+   * <p>This is due to the fact that TimeUnit has different internal multipliers for each interval,
+   * e.g. MONTH is 12, but MINUTE is 60000. When Calcite parses SQL interval literals, it returns
+   * those internal multipliers. This means we need to do similar thing, so that this multiplication
+   * expression behaves the same way as literal interval expression.
+   *
+   * <p>That is, we need to make sure that this:
+   *   "TIMESTAMP '1984-04-19 01:02:03' + INTERVAL '2' YEAR"
+   * is equivalent tot this:
+   *   "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1984-04-19 01:02:03')"
+   */
+  @Override
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    BeamSqlPrimitive intervalOperandPrimitive =
+        findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).get().evaluate(inputRow, window);
+    SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType();
+
+    BeamSqlPrimitive integerOperandPrimitive =
+        findExpressionOfType(operands, SqlTypeName.INTEGER).get().evaluate(inputRow, window);
+    BigDecimal integerOperandValue = new BigDecimal(integerOperandPrimitive.getInteger());
+
+    BigDecimal multiplicationResult =
+        integerOperandValue.multiply(
+            timeUnitInternalMultiplier(intervalOperandType));
+
+    return BeamSqlPrimitive.of(outputType, multiplicationResult);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
new file mode 100644
index 0000000..b432d20
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import java.math.BigDecimal;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utils to convert between Calcite's TimeUnit and Sql intervals.
+ */
+public abstract class TimeUnitUtils {
+
+  /**
+   * @return internal multiplier of a TimeUnit, e.g. YEAR is 12, MINUTE is 60000
+   * @throws IllegalArgumentException if interval type is not supported
+   */
+  public static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) {
+    switch (sqlIntervalType) {
+      case INTERVAL_SECOND:
+        return TimeUnit.SECOND.multiplier;
+      case INTERVAL_MINUTE:
+        return TimeUnit.MINUTE.multiplier;
+      case INTERVAL_HOUR:
+        return TimeUnit.HOUR.multiplier;
+      case INTERVAL_DAY:
+        return TimeUnit.DAY.multiplier;
+      case INTERVAL_MONTH:
+        return TimeUnit.MONTH.multiplier;
+      case INTERVAL_YEAR:
+        return TimeUnit.YEAR.multiplier;
+      default:
+        throw new IllegalArgumentException("Interval " + sqlIntervalType
+            + " cannot be converted to TimeUnit");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
new file mode 100644
index 0000000..1ab703e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java
@@ -0,0 +1,46 @@
+package org.apache.beam.sdk.extensions.sql.impl.utils;
+
+import com.google.common.base.Optional;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utils to help with SqlTypes.
+ */
+public class SqlTypeUtils {
+  /**
+   * Finds an operand with provided type.
+   * Returns Optional.absent() if no operand found with matching type
+   */
+  public static Optional<BeamSqlExpression> findExpressionOfType(
+      List<BeamSqlExpression> operands, SqlTypeName type) {
+
+    for (BeamSqlExpression operand : operands) {
+      if (type.equals(operand.getOutputType())) {
+        return Optional.of(operand);
+      }
+    }
+
+    return Optional.absent();
+  }
+
+  /**
+   * Finds an operand with the type in typesToFind.
+   * Returns Optional.absent() if no operand found with matching type
+   */
+  public static Optional<BeamSqlExpression> findExpressionOfType(
+      List<BeamSqlExpression> operands, Collection<SqlTypeName> typesToFind) {
+
+    for (BeamSqlExpression operand : operands) {
+      if (typesToFind.contains(operand.getOutputType())) {
+        return Optional.of(operand);
+      }
+    }
+
+    return Optional.absent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index f350087..c4583ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
+
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
@@ -40,7 +41,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
@@ -57,12 +60,15 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -412,5 +418,29 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
     );
     exp = BeamSqlFnExecutor.buildExpression(rexNode);
     assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
+
+    // DATETIME_PLUS
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.DATETIME_PLUS,
+        Arrays.<RexNode>asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeIntervalLiteral(
+                new BigDecimal(10),
+                new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO))
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDatetimePlusExpression);
+
+    // * for intervals
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY,
+        Arrays.<RexNode>asList(
+            rexBuilder.makeExactLiteral(new BigDecimal(1)),
+            rexBuilder.makeIntervalLiteral(
+                new BigDecimal(10),
+                new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO))
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlIntervalMultiplyExpression);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
index 0e57404..cb0b6ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
@@ -22,13 +22,14 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.TimeZone;
+
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
 
 /**
  * Base class for all date related expression test.
  */
 public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
-  protected long str2LongTime(String dateStr) {
+  static long str2LongTime(String dateStr) {
     SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     try {
       Date date = format.parse(dateStr);
@@ -38,7 +39,7 @@ public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
     }
   }
 
-  protected Date str2DateTime(String dateStr) {
+  static Date str2DateTime(String dateStr) {
     SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     try {
       format.setTimeZone(TimeZone.getTimeZone("GMT"));

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
new file mode 100644
index 0000000..57e709f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link BeamSqlDatetimePlusExpression}.
+ */
+public class BeamSqlDatetimePlusExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static final BeamRecord NULL_INPUT_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+  private static final Date DATE = str2DateTime("1984-04-19 01:02:03");
+
+  private static final Date DATE_PLUS_15_SECONDS = new DateTime(DATE).plusSeconds(15).toDate();
+  private static final Date DATE_PLUS_10_MINUTES = new DateTime(DATE).plusMinutes(10).toDate();
+  private static final Date DATE_PLUS_7_HOURS = new DateTime(DATE).plusHours(7).toDate();
+  private static final Date DATE_PLUS_3_DAYS = new DateTime(DATE).plusDays(3).toDate();
+  private static final Date DATE_PLUS_2_MONTHS = new DateTime(DATE).plusMonths(2).toDate();
+  private static final Date DATE_PLUS_11_YEARS = new DateTime(DATE).plusYears(11).toDate();
+
+  private static final BeamSqlExpression SQL_INTERVAL_15_SECONDS =
+      interval(SqlTypeName.INTERVAL_SECOND, 15);
+  private static final BeamSqlExpression SQL_INTERVAL_10_MINUTES =
+      interval(SqlTypeName.INTERVAL_MINUTE, 10);
+  private static final BeamSqlExpression SQL_INTERVAL_7_HOURS =
+      interval(SqlTypeName.INTERVAL_HOUR, 7);
+  private static final BeamSqlExpression SQL_INTERVAL_3_DAYS =
+      interval(SqlTypeName.INTERVAL_DAY, 3);
+  private static final BeamSqlExpression SQL_INTERVAL_2_MONTHS =
+      interval(SqlTypeName.INTERVAL_MONTH, 2);
+  private static final BeamSqlExpression SQL_INTERVAL_4_MONTHS =
+      interval(SqlTypeName.INTERVAL_MONTH, 4);
+  private static final BeamSqlExpression SQL_INTERVAL_11_YEARS =
+      interval(SqlTypeName.INTERVAL_YEAR, 11);
+
+  private static final BeamSqlExpression SQL_TIMESTAMP =
+      BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, DATE);
+
+  @Test public void testHappyPath_outputTypeAndAccept() {
+    BeamSqlExpression plusExpression = dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS);
+
+    assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+    assertTrue(plusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptTreeOperands() {
+    BeamSqlDatetimePlusExpression plusExpression =
+        dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS);
+
+    assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+    assertFalse(plusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptWithoutTimestampOperand() {
+    BeamSqlDatetimePlusExpression plusExpression =
+        dateTimePlus(SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS);
+
+    assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+    assertFalse(plusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptWithoutIntervalOperand() {
+    BeamSqlDatetimePlusExpression plusExpression =
+        dateTimePlus(SQL_TIMESTAMP, SQL_TIMESTAMP);
+
+    assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType());
+    assertFalse(plusExpression.accept());
+  }
+
+  @Test public void testEvaluate() {
+    assertEquals(DATE_PLUS_15_SECONDS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_15_SECONDS));
+    assertEquals(DATE_PLUS_10_MINUTES, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_10_MINUTES));
+    assertEquals(DATE_PLUS_7_HOURS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_7_HOURS));
+    assertEquals(DATE_PLUS_3_DAYS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS));
+    assertEquals(DATE_PLUS_2_MONTHS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_2_MONTHS));
+    assertEquals(DATE_PLUS_11_YEARS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_11_YEARS));
+  }
+
+  @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+    thrown.expect(UnsupportedOperationException.class);
+
+    BeamSqlPrimitive unsupportedInterval = BeamSqlPrimitive.of(SqlTypeName.INTERVAL_YEAR_MONTH, 3);
+    evalDatetimePlus(SQL_TIMESTAMP, unsupportedInterval);
+  }
+
+  private static Date evalDatetimePlus(BeamSqlExpression date, BeamSqlExpression interval) {
+    return dateTimePlus(date, interval).evaluate(NULL_INPUT_ROW, NULL_WINDOW).getDate();
+  }
+
+  private static BeamSqlDatetimePlusExpression dateTimePlus(BeamSqlExpression ... operands) {
+    return new BeamSqlDatetimePlusExpression(Arrays.asList(operands));
+  }
+
+  private static BeamSqlExpression interval(SqlTypeName type, int multiplier) {
+    return BeamSqlPrimitive.of(type,
+        timeUnitInternalMultiplier(type)
+            .multiply(new BigDecimal(multiplier)));
+  }
+
+  private static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) {
+    switch (sqlIntervalType) {
+      case INTERVAL_SECOND:
+        return TimeUnit.SECOND.multiplier;
+      case INTERVAL_MINUTE:
+        return TimeUnit.MINUTE.multiplier;
+      case INTERVAL_HOUR:
+        return TimeUnit.HOUR.multiplier;
+      case INTERVAL_DAY:
+        return TimeUnit.DAY.multiplier;
+      case INTERVAL_MONTH:
+        return TimeUnit.MONTH.multiplier;
+      case INTERVAL_YEAR:
+        return TimeUnit.YEAR.multiplier;
+      default:
+        throw new IllegalArgumentException("Interval " + sqlIntervalType
+            + " cannot be converted to TimeUnit");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
new file mode 100644
index 0000000..0c91f40
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlIntervalMultiplyExpression.
+ */
+public class BeamSqlIntervalMultiplyExpressionTest {
+  private static final BeamRecord NULL_INPUT_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+  private static final BigDecimal DECIMAL_THREE = new BigDecimal(3);
+  private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4);
+
+  private static final BeamSqlExpression SQL_INTERVAL_DAY =
+      BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE);
+
+  private static final BeamSqlExpression SQL_INTERVAL_MONTH =
+      BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR);
+
+  private static final BeamSqlExpression SQL_INTEGER_FOUR =
+      BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4);
+
+  private static final BeamSqlExpression SQL_INTEGER_FIVE =
+      BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5);
+
+  @Test public void testHappyPath_outputTypeAndAccept() {
+    BeamSqlExpression multiplyExpression =
+        newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR);
+
+    assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+    assertTrue(multiplyExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptTreeOperands() {
+    BeamSqlIntervalMultiplyExpression multiplyExpression =
+        newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FIVE, SQL_INTEGER_FOUR);
+
+    assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+    assertFalse(multiplyExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptWithoutIntervalOperand() {
+    BeamSqlIntervalMultiplyExpression multiplyExpression =
+        newMultiplyExpression(SQL_INTEGER_FOUR, SQL_INTEGER_FIVE);
+
+    assertNull(multiplyExpression.getOutputType());
+    assertFalse(multiplyExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptWithoutIntegerOperand() {
+    BeamSqlIntervalMultiplyExpression multiplyExpression =
+        newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTERVAL_MONTH);
+
+    assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType());
+    assertFalse(multiplyExpression.accept());
+  }
+
+  @Test public void testEvaluate_integerOperand() {
+    BeamSqlIntervalMultiplyExpression multiplyExpression =
+        newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR);
+
+    BeamSqlPrimitive multiplicationResult =
+        multiplyExpression.evaluate(NULL_INPUT_ROW, NULL_WINDOW);
+
+    BigDecimal expectedResult =
+        DECIMAL_FOUR.multiply(timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY));
+
+    assertEquals(expectedResult, multiplicationResult.getDecimal());
+    assertEquals(SqlTypeName.INTERVAL_DAY, multiplicationResult.getOutputType());
+  }
+
+  private BeamSqlIntervalMultiplyExpression newMultiplyExpression(BeamSqlExpression ... operands) {
+    return new BeamSqlIntervalMultiplyExpression(Arrays.asList(operands));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
new file mode 100644
index 0000000..91552ae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java
@@ -0,0 +1,54 @@
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit tests for {@link TimeUnitUtils}.
+ */
+public class TimeUnitUtilsTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testReturnsInternalTimeUnitMultipliers() {
+    assertEquals(TimeUnit.SECOND.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_SECOND));
+    assertEquals(TimeUnit.MINUTE.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MINUTE));
+    assertEquals(TimeUnit.HOUR.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_HOUR));
+    assertEquals(TimeUnit.DAY.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY));
+    assertEquals(TimeUnit.MONTH.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MONTH));
+    assertEquals(TimeUnit.YEAR.multiplier,
+        TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_YEAR));
+  }
+
+  @Test public void testThrowsForUnsupportedIntervalType() {
+    thrown.expect(IllegalArgumentException.class);
+    TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY_MINUTE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
new file mode 100644
index 0000000..1a14256
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.utils;
+
+import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for {@link SqlTypeUtils}.
+ */
+public class SqlTypeUtilsTest {
+  private static final BigDecimal DECIMAL_THREE = new BigDecimal(3);
+  private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4);
+
+  private static final List<BeamSqlExpression> EXPRESSIONS = Arrays.<BeamSqlExpression> asList(
+      BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE),
+      BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR),
+      BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4),
+      BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+
+  @Test public void testFindExpressionOfType_success() {
+    Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INTEGER);
+
+    assertTrue(typeName.isPresent());
+    assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType());
+  }
+
+  @Test public void testFindExpressionOfType_failure() {
+    Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.VARCHAR);
+
+    assertFalse(typeName.isPresent());
+  }
+
+  @Test public void testFindExpressionOfTypes_success() {
+    Optional<BeamSqlExpression> typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INT_TYPES);
+
+    assertTrue(typeName.isPresent());
+    assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType());
+  }
+
+  @Test public void testFindExpressionOfTypes_failure() {
+    Optional<BeamSqlExpression> typeName =
+        findExpressionOfType(EXPRESSIONS, SqlTypeName.CHAR_TYPES);
+
+    assertFalse(typeName.isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 1fdb35f..6937a18 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Date;
 import java.util.Iterator;
+
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -35,7 +36,7 @@ import org.junit.Test;
  */
 public class BeamSqlDateFunctionsIntegrationTest
     extends BeamSqlBuiltinFunctionsIntegrationTestBase {
-  @Test public void testDateTimeFunctions() throws Exception {
+  @Test public void testBasicDateTimeFunctions() throws Exception {
     ExpressionChecker checker = new ExpressionChecker()
         .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
         .addExpr("YEAR(ts)", 1986L)
@@ -54,6 +55,42 @@ public class BeamSqlDateFunctionsIntegrationTest
     checker.buildRunAndCheck();
   }
 
+  @Test public void testDatetimePlusFunction() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')",
+            parseDate("1984-04-19 01:02:06"))
+        .addExpr("TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')",
+            parseDate("1984-04-19 01:05:03"))
+        .addExpr("TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')",
+            parseDate("1984-04-19 04:02:03"))
+        .addExpr("TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')",
+            parseDate("1984-04-22 01:02:03"))
+        .addExpr("TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')",
+            parseDate("1984-03-19 01:02:03"))
+        .addExpr("TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')",
+            parseDate("1987-01-19 01:02:03"))
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test public void testDatetimeInfixPlus() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND",
+            parseDate("1984-01-19 01:02:06"))
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE",
+            parseDate("1984-01-19 01:04:03"))
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR",
+            parseDate("1984-01-19 03:02:03"))
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY",
+            parseDate("1984-01-21 01:02:03"))
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH",
+            parseDate("1984-03-19 01:02:03"))
+        .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
+            parseDate("1986-01-19 01:02:03"))
+        ;
+    checker.buildRunAndCheck();
+  }
+
   @Test public void testDateTimeFunctions_currentTime() throws Exception {
     String sql = "SELECT "
         + "LOCALTIME as l,"