You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/08 06:11:33 UTC

[1/2] beam git commit: [BEAM-2203] Implement 'timestamp - interval'

Repository: beam
Updated Branches:
  refs/heads/master 299429394 -> e0166ceb6


[BEAM-2203] Implement 'timestamp - interval'

Move TIMESTAMPDIFF implementation into BeamSqlTimestampMinusTimestampExpression, add BeamSqlTimestampMinusIntervalExpression


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a63eb21
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a63eb21
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a63eb21

Branch: refs/heads/master
Commit: 0a63eb2195575f6cc5ee228d4edb1ed9571f9f5f
Parents: 29942939
Author: Anton Kedin <ke...@google.com>
Authored: Sun Nov 5 23:26:10 2017 -0800
Committer: James Xu <xu...@gmail.com>
Committed: Wed Nov 8 11:58:34 2017 +0800

----------------------------------------------------------------------
 .../date/BeamSqlDatetimeMinusExpression.java    | 104 ++++-----
 ...BeamSqlTimestampMinusIntervalExpression.java |  83 +++++++
 ...eamSqlTimestampMinusTimestampExpression.java |  97 ++++++++
 .../BeamSqlDatetimeMinusExpressionTest.java     | 189 ++++-----------
 ...SqlTimestampMinusIntervalExpressionTest.java | 163 +++++++++++++
 ...qlTimestampMinusTimestampExpressionTest.java | 233 +++++++++++++++++++
 .../BeamSqlDateFunctionsIntegrationTest.java    |  18 ++
 7 files changed, 691 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
index 6a5cbb1..6948ba1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
@@ -28,80 +28,80 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimi
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
 import org.joda.time.DurationFieldType;
-import org.joda.time.Period;
-import org.joda.time.PeriodType;
 
 /**
  * Infix '-' operation for timestamps.
  *
- * <p>Currently this implementation is specific to how Calcite implements 'TIMESTAMPDIFF(..)'.
- * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
- * with corresponding TimeUnit's multiplier.
+ * <p>Implements 2 SQL subtraction operations at the moment:
+ * 'timestampdiff(timeunit, timestamp, timestamp)', and 'timestamp - interval'
  *
- * <p>In addition to this TIMESTAMPDIFF(..) implementation, Calcite also supports infix
- * operations 'interval - interval' and 'timestamp - interval'.
- * These are not implemented yet.
+ * <p>Calcite converts both of the above into infix '-' expression, with different operands and
+ * return types.
+ *
+ * <p>This class delegates evaluation to specific implementation of one of the above operations,
+ * see {@link BeamSqlTimestampMinusTimestampExpression}
+ * and {@link BeamSqlTimestampMinusIntervalExpression}
+ *
+ * <p>Calcite supports one more subtraction kind: 'interval - interval',
+ * but it is not implemented yet.
  */
 public class BeamSqlDatetimeMinusExpression extends BeamSqlExpression {
-  private SqlTypeName intervalType;
 
-  private static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
+  static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
       ImmutableMap.<SqlTypeName, DurationFieldType>builder()
-      .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
-      .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
-      .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
-      .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
-      .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
-      .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
-      .build();
-
-  public BeamSqlDatetimeMinusExpression(
-      List<BeamSqlExpression> operands, SqlTypeName intervalType) {
-    super(operands, SqlTypeName.BIGINT);
-    this.intervalType = intervalType;
+          .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
+          .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
+          .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
+          .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
+          .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
+          .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
+          .build();
+
+  private BeamSqlExpression delegateExpression;
+
+  public BeamSqlDatetimeMinusExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+
+    this.delegateExpression = createDelegateExpression(operands, outputType);
   }
 
-  /**
-   * Requires exactly 2 operands. One should be a timestamp, another an interval
-   */
-  @Override
-  public boolean accept() {
-    return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
-        && operands.size() == 2
-        && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
-        && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+  private BeamSqlExpression createDelegateExpression(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
+
+    if (isTimestampMinusTimestamp(operands, outputType)) {
+      return new BeamSqlTimestampMinusTimestampExpression(operands, outputType);
+    } else if (isTimestampMinusInterval(operands, outputType)) {
+      return new BeamSqlTimestampMinusIntervalExpression(operands, outputType);
+    }
+
+    return null;
   }
 
-  /**
-   * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
-   * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
-   * multiplied by the corresponding TimeUnit.multiplier
-   */
-  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
-    DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
-    DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+  private boolean isTimestampMinusTimestamp(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
+
+    return BeamSqlTimestampMinusTimestampExpression.accept(operands, outputType);
+  }
 
-    long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
-    long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+  private boolean isTimestampMinusInterval(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
 
-    return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+    return BeamSqlTimestampMinusIntervalExpression.accept(operands, outputType);
   }
 
-  private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
-    Period period = new Period(timestampStart, timestampEnd,
-        PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
-    return period.get(durationFieldType(intervalType));
+  @Override
+  public boolean accept() {
+    return delegateExpression != null && delegateExpression.accept();
   }
 
-  private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
-    if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
-        throw new IllegalArgumentException("Counting "
-            + intervalTypeToCount.getName() + "s between dates is not supported");
+  @Override
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    if (delegateExpression == null) {
+      throw new IllegalStateException("Unable to execute unsupported 'datetime minus' expression");
     }
 
-    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+    return delegateExpression.evaluate(inputRow, window);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
new file mode 100644
index 0000000..236d148
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+
+/**
+ * '-' operator for 'timestamp - interval' expressions.
+ *
+ * <p>See {@link BeamSqlDatetimeMinusExpression} for other kinds of datetime types subtraction.
+ */
+public class BeamSqlTimestampMinusIntervalExpression extends BeamSqlExpression {
+
+  public BeamSqlTimestampMinusIntervalExpression(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return accept(operands, outputType);
+  }
+
+  static boolean accept(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    return operands.size() == 2
+        && SqlTypeName.TIMESTAMP.equals(outputType)
+        && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+        && INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamRecord row, BoundedWindow window) {
+    DateTime date = new DateTime(opValueEvaluated(0, row, window));
+    Period period = intervalToPeriod(op(1).evaluate(row, window));
+
+    Date subtractionResult = date.minus(period).toDate();
+
+    return BeamSqlPrimitive.of(outputType, subtractionResult);
+  }
+
+  private Period intervalToPeriod(BeamSqlPrimitive operand) {
+    BigDecimal intervalValue = operand.getDecimal();
+    SqlTypeName intervalType = operand.getOutputType();
+
+    int numberOfIntervals = intervalValue
+        .divide(TimeUnitUtils.timeUnitInternalMultiplier(intervalType)).intValueExact();
+
+    return new Period().withField(durationFieldType(intervalType), numberOfIntervals);
+  }
+
+  private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
new file mode 100644
index 0000000..64ac9c8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+
+/**
+ * Infix '-' operation for timestamps.
+ *
+ * <p>Currently this implementation is specific to how Calcite parses 'TIMESTAMPDIFF(..)'.
+ * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
+ * with corresponding TimeUnit's multiplier.
+ *
+ * <p>See {@link BeamSqlDatetimeMinusExpression} for other kinds of datetime types subtraction.
+ */
+public class BeamSqlTimestampMinusTimestampExpression extends BeamSqlExpression {
+  private SqlTypeName intervalType;
+
+  public BeamSqlTimestampMinusTimestampExpression(
+      List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+    super(operands, SqlTypeName.BIGINT);
+    this.intervalType = intervalType;
+  }
+
+  /**
+   * Requires exactly 2 operands. One should be a timestamp, another an interval
+   */
+  @Override
+  public boolean accept() {
+    return accept(operands, intervalType);
+  }
+
+  static boolean accept(List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+    return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
+        && operands.size() == 2
+        && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+        && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+  }
+
+  /**
+   * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
+   * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
+   * multiplied by the corresponding TimeUnit.multiplier
+   */
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
+    DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+
+    long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
+    long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+
+    return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+  }
+
+  private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
+    Period period = new Period(timestampStart, timestampEnd,
+        PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
+    return period.get(durationFieldType(intervalType));
+  }
+
+  private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+    if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
+      throw new IllegalArgumentException("Counting "
+          + intervalTypeToCount.getName() + "s between dates is not supported");
+    }
+
+    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
index e0fe166..ef837ca 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Date;
 
@@ -32,201 +33,101 @@ import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.joda.time.DateTime;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 /**
  * Unit tests for {@link BeamSqlDatetimeMinusExpression}.
  */
 public class BeamSqlDatetimeMinusExpressionTest {
+
   private static final BeamRecord NULL_ROW = null;
   private static final BoundedWindow NULL_WINDOW = null;
 
-  private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+  private static final Date DATE = new Date(329281L);
   private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
-  private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
-  private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
-  private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
-  private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
-  private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
 
-  @Rule public ExpectedException thrown = ExpectedException.none();
+  private static final BeamSqlPrimitive TIMESTAMP = BeamSqlPrimitive.of(
+      SqlTypeName.TIMESTAMP, DATE);
 
-  @Test public void testOutputTypeIsBigint() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE));
+  private static final BeamSqlPrimitive TIMESTAMP_MINUS_2_SEC = BeamSqlPrimitive.of(
+      SqlTypeName.TIMESTAMP, DATE_MINUS_2_SEC);
 
-    assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
-  }
+  private static final BeamSqlPrimitive INTERVAL_2_SEC = BeamSqlPrimitive.of(
+      SqlTypeName.INTERVAL_SECOND, TimeUnit.SECOND.multiplier.multiply(new BigDecimal(2)));
 
-  @Test public void testAccepts2Timestamps() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE));
+  private static final BeamSqlPrimitive STRING = BeamSqlPrimitive.of(
+      SqlTypeName.VARCHAR, "hello");
 
-    assertTrue(minusExpression.accept());
-  }
+  private static final BeamSqlPrimitive INTERVAL_3_MONTHS = BeamSqlPrimitive.of(
+      SqlTypeName.INTERVAL_MONTH, TimeUnit.MONTH.multiplier.multiply(new BigDecimal(3)));
 
-  @Test public void testDoesNotAccept3Timestamps() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE_MINUS_1_YEAR),
-            timestamp(DATE));
+  @Test public void testOutputType() {
+    BeamSqlDatetimeMinusExpression minusExpression1 =
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+    BeamSqlDatetimeMinusExpression minusExpression2 =
+        minusExpression(SqlTypeName.BIGINT, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
 
-    assertFalse(minusExpression.accept());
+    assertEquals(SqlTypeName.TIMESTAMP, minusExpression1.getOutputType());
+    assertEquals(SqlTypeName.BIGINT, minusExpression2.getOutputType());
   }
 
-  @Test public void testDoesNotAccept1Timestamp() {
+  @Test public void testAcceptsTimestampMinusTimestamp() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE));
+        minusExpression(SqlTypeName.INTERVAL_SECOND, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
 
-    assertFalse(minusExpression.accept());
+    assertTrue(minusExpression.accept());
   }
 
-  @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+  @Test public void testAcceptsTimestampMinusInteval() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY_MINUTE,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE));
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
 
-    assertFalse(minusExpression.accept());
+    assertTrue(minusExpression.accept());
   }
 
-  @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+  @Test public void testDoesNotAcceptUnsupportedReturnType() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
-            timestamp(DATE));
+        minusExpression(SqlTypeName.BIGINT, TIMESTAMP, INTERVAL_2_SEC);
 
     assertFalse(minusExpression.accept());
   }
 
-  @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+  @Test public void testDoesNotAcceptUnsupportedFirstOperand() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE),
-            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+        minusExpression(SqlTypeName.TIMESTAMP, STRING, INTERVAL_2_SEC);
 
     assertFalse(minusExpression.accept());
   }
 
-  @Test public void testEvaluateDiffSeconds() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_SECOND,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_2_SEC));
-
-    long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
-    assertEquals(expectedResult, eval(minusExpression));
-  }
-
-  @Test public void testEvaluateDiffMinutes() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_MINUTE,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_3_MIN));
-
-    long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
-    assertEquals(expectedResult, eval(minusExpression));
-  }
-
-  @Test public void testEvaluateDiffHours() {
+  @Test public void testDoesNotAcceptUnsupportedSecondOperand() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_HOUR,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_4_HOURS));
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, STRING);
 
-    long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
-    assertEquals(expectedResult, eval(minusExpression));
+    assertFalse(minusExpression.accept());
   }
 
-  @Test public void testEvaluateDiffDays() {
+  @Test public void testEvaluateTimestampMinusTimestamp() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_7_DAYS));
+        minusExpression(SqlTypeName.INTERVAL_SECOND, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
 
-    long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
-    assertEquals(expectedResult, eval(minusExpression));
-  }
+    BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
 
-  @Test public void testEvaluateDiffMonths() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_MONTH,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_2_MONTHS));
-
-    long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
-    assertEquals(expectedResult, eval(minusExpression));
+    assertEquals(SqlTypeName.BIGINT, subtractionResult.getOutputType());
+    assertEquals(2L * TimeUnit.SECOND.multiplier.longValue(), subtractionResult.getLong());
   }
 
-  @Test public void testEvaluateDiffYears() {
+  @Test public void testEvaluateTimestampMinusInteval() {
     BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_YEAR,
-            timestamp(DATE),
-            timestamp(DATE_MINUS_1_YEAR));
-
-    long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
-    assertEquals(expectedResult, eval(minusExpression));
-  }
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
 
-  @Test public void testEvaluateNegativeDiffSeconds() {
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_SECOND,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE));
+    BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
 
-    long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
-    assertEquals(expectedResult, eval(minusExpression));
-  }
-
-  @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
-
-    thrown.expect(IllegalArgumentException.class);
-
-    BeamSqlDatetimeMinusExpression minusExpression =
-        minusExpression(
-            SqlTypeName.INTERVAL_DAY_MINUTE,
-            timestamp(DATE_MINUS_2_SEC),
-            timestamp(DATE));
-
-    eval(minusExpression);
+    assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+    assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
   }
 
   private static BeamSqlDatetimeMinusExpression minusExpression(
-      SqlTypeName intervalsToCount, BeamSqlExpression ... operands) {
-    return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), intervalsToCount);
-  }
-
-  private BeamSqlExpression timestamp(Date date) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
-  }
-
-  private long eval(BeamSqlDatetimeMinusExpression minusExpression) {
-    return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
-  }
-
-  private long applyMultiplier(long value, TimeUnit timeUnit) {
-    return value * timeUnit.multiplier.longValue();
+      SqlTypeName outputType, BeamSqlExpression ... operands) {
+    return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), outputType);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
new file mode 100644
index 0000000..5232487
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlTimestampMinusIntervalExpression}.
+ */
+public class BeamSqlTimestampMinusIntervalExpressionTest {
+  private static final BeamRecord NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  private static final Date DATE = new Date(329281L);
+  private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+
+  private static final BeamSqlPrimitive TIMESTAMP = BeamSqlPrimitive.of(
+      SqlTypeName.TIMESTAMP, DATE);
+
+  private static final BeamSqlPrimitive INTERVAL_2_SEC = BeamSqlPrimitive.of(
+      SqlTypeName.INTERVAL_SECOND, TimeUnit.SECOND.multiplier.multiply(new BigDecimal(2)));
+
+  private static final BeamSqlPrimitive INTERVAL_3_MONTHS = BeamSqlPrimitive.of(
+      SqlTypeName.INTERVAL_MONTH, TimeUnit.MONTH.multiplier.multiply(new BigDecimal(3)));
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testBasicProperties() {
+    BeamSqlTimestampMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.INTERVAL_DAY_MINUTE, TIMESTAMP, INTERVAL_3_MONTHS);
+
+    assertEquals(SqlTypeName.INTERVAL_DAY_MINUTE, minusExpression.getOutputType());
+    assertEquals(Arrays.asList(TIMESTAMP, INTERVAL_3_MONTHS), minusExpression.getOperands());
+  }
+
+  @Test public void testAcceptsHappyPath() {
+    BeamSqlTimestampMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+
+    assertTrue(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptOneOperand() {
+    BeamSqlTimestampMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP);
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptThreeOperands() {
+    BeamSqlTimestampMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC, INTERVAL_3_MONTHS);
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptWrongOutputType() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.remove(SqlTypeName.TIMESTAMP);
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlTimestampMinusIntervalExpression minusExpression =
+          minusExpression(unsupportedType, TIMESTAMP, INTERVAL_2_SEC);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test public void testDoesNotAcceptWrongFirstOperand() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.remove(SqlTypeName.TIMESTAMP);
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlTimestampMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, unsupportedOperand, INTERVAL_2_SEC);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test public void testDoesNotAcceptWrongSecondOperand() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.removeAll(INTERVALS_DURATIONS_TYPES.keySet());
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlTimestampMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, unsupportedOperand);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test public void testAcceptsAllSupportedIntervalTypes() {
+    for (SqlTypeName unsupportedType : INTERVALS_DURATIONS_TYPES.keySet()) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlTimestampMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, unsupportedOperand);
+
+      assertTrue(minusExpression.accept());
+    }
+  }
+
+  @Test public void testEvaluateHappyPath() {
+    BeamSqlTimestampMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+
+    BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
+
+    assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+    assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
+  }
+
+  private static BeamSqlTimestampMinusIntervalExpression minusExpression(
+      SqlTypeName intervalsToCount, BeamSqlExpression... operands) {
+    return new BeamSqlTimestampMinusIntervalExpression(Arrays.asList(operands), intervalsToCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
new file mode 100644
index 0000000..54bf52d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlTimestampMinusTimestampExpression}.
+ */
+public class BeamSqlTimestampMinusTimestampExpressionTest {
+
+  private static final BeamRecord NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+  private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+  private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
+  private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
+  private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
+  private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
+  private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testOutputTypeIsBigint() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
+  }
+
+  @Test public void testAccepts2Timestamps() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertTrue(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAccept3Timestamps() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE_MINUS_1_YEAR),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAccept1Timestamp() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY_MINUTE,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testEvaluateDiffSeconds() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_SECOND,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_2_SEC));
+
+    long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffMinutes() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_MINUTE,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_3_MIN));
+
+    long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffHours() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_HOUR,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_4_HOURS));
+
+    long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffDays() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_7_DAYS));
+
+    long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffMonths() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_MONTH,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_2_MONTHS));
+
+    long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffYears() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_YEAR,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_1_YEAR));
+
+    long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateNegativeDiffSeconds() {
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_SECOND,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+
+    thrown.expect(IllegalArgumentException.class);
+
+    BeamSqlTimestampMinusTimestampExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY_MINUTE,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    eval(minusExpression);
+  }
+
+  private static BeamSqlTimestampMinusTimestampExpression minusExpression(
+      SqlTypeName intervalsToCount, BeamSqlExpression... operands) {
+    return new BeamSqlTimestampMinusTimestampExpression(Arrays.asList(operands), intervalsToCount);
+  }
+
+  private BeamSqlExpression timestamp(Date date) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
+  }
+
+  private long eval(BeamSqlTimestampMinusTimestampExpression minusExpression) {
+    return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
+  }
+
+  private long applyMultiplier(long value, TimeUnit timeUnit) {
+    return value * timeUnit.multiplier.longValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index ba901d3..ec5b295 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -143,6 +143,24 @@ public class BeamSqlDateFunctionsIntegrationTest
     checker.buildRunAndCheck();
   }
 
+  @Test public void testTimestampMinusInterval() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND",
+            parseDate("1984-04-19 01:01:56"))
+        .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE",
+            parseDate("1984-04-19 01:00:58"))
+        .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR",
+            parseDate("1984-04-18 21:01:58"))
+        .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY",
+            parseDate("1984-04-14 01:01:58"))
+        .addExpr("TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH",
+            parseDate("1983-11-19 01:01:58"))
+        .addExpr("TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
+            parseDate("1983-01-19 01:01:58"))
+        ;
+    checker.buildRunAndCheck();
+  }
+
   @Test public void testDateTimeFunctions_currentTime() throws Exception {
     String sql = "SELECT "
         + "LOCALTIME as l,"


[2/2] beam git commit: This closes #4082

Posted by xu...@apache.org.
This closes #4082


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0166ceb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0166ceb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0166ceb

Branch: refs/heads/master
Commit: e0166ceb657b393e4037ac7c178797a864379745
Parents: 29942939 0a63eb2
Author: James Xu <xu...@gmail.com>
Authored: Wed Nov 8 11:59:07 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Wed Nov 8 11:59:07 2017 +0800

----------------------------------------------------------------------
 .../date/BeamSqlDatetimeMinusExpression.java    | 104 ++++-----
 ...BeamSqlTimestampMinusIntervalExpression.java |  83 +++++++
 ...eamSqlTimestampMinusTimestampExpression.java |  97 ++++++++
 .../BeamSqlDatetimeMinusExpressionTest.java     | 189 ++++-----------
 ...SqlTimestampMinusIntervalExpressionTest.java | 163 +++++++++++++
 ...qlTimestampMinusTimestampExpressionTest.java | 233 +++++++++++++++++++
 .../BeamSqlDateFunctionsIntegrationTest.java    |  18 ++
 7 files changed, 691 insertions(+), 196 deletions(-)
----------------------------------------------------------------------