You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/07 15:54:52 UTC

[1/2] beam git commit: [BEAM-2203] Implement TIMESTAMPDIFF()

Repository: beam
Updated Branches:
  refs/heads/master 269bf8946 -> f80bf2fdf


[BEAM-2203] Implement TIMESTAMPDIFF()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fe7480a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fe7480a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fe7480a

Branch: refs/heads/master
Commit: 4fe7480a749699e0d1418dbe0b29e6ecefd08497
Parents: 269bf89
Author: Anton Kedin <ke...@kedin-macbookpro.roam.corp.google.com>
Authored: Mon Oct 30 17:36:21 2017 -0700
Committer: James Xu <xu...@gmail.com>
Committed: Tue Nov 7 23:09:20 2017 +0800

----------------------------------------------------------------------
 sdks/java/extensions/sql/pom.xml                |   8 +
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |   9 +-
 .../operator/BeamSqlReinterpretExpression.java  |  55 -----
 .../date/BeamSqlDatetimeMinusExpression.java    | 107 +++++++++
 .../BeamSqlReinterpretExpression.java           |  58 +++++
 .../DatetimeReinterpretConversions.java         |  59 +++++
 .../IntegerReinterpretConversions.java          |  44 ++++
 .../reinterpret/ReinterpretConversion.java      | 114 +++++++++
 .../operator/reinterpret/Reinterpreter.java     | 101 ++++++++
 .../operator/reinterpret/package-info.java      |  22 ++
 .../impl/interpreter/BeamSqlFnExecutorTest.java |  14 ++
 .../BeamSqlReinterpretExpressionTest.java       | 116 +++++++---
 .../BeamSqlDatetimeMinusExpressionTest.java     | 232 +++++++++++++++++++
 .../DatetimeReinterpretConversionsTest.java     |  73 ++++++
 .../IntegerReinterpretConversionsTest.java      |  81 +++++++
 .../reinterpret/ReinterpretConversionTest.java  | 116 ++++++++++
 .../operator/reinterpret/ReinterpreterTest.java | 155 +++++++++++++
 .../BeamSqlDateFunctionsIntegrationTest.java    |  52 +++++
 18 files changed, 1330 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index 742d3b6..34ff670 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -37,6 +37,7 @@
     <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
     <calcite.version>1.13.0</calcite.version>
     <avatica.version>1.10.0</avatica.version>
+    <mockito.version>1.9.5</mockito.version>
   </properties>
 
   <profiles>
@@ -271,5 +272,12 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 8770055..31d5022 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastE
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlReinterpretExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
@@ -50,6 +49,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSql
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
@@ -235,7 +236,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
           ret = new BeamSqlPlusExpression(subExps);
           break;
         case "-":
-          ret = new BeamSqlMinusExpression(subExps);
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            ret = new BeamSqlMinusExpression(subExps);
+          } else {
+            ret = new BeamSqlDatetimeMinusExpression(subExps, node.type.getSqlTypeName());
+          }
           break;
         case "*":
           if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
deleted file mode 100644
index 2ec4fb5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for REINTERPRET.
- *
- * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
- * to {@code BIGINT} is supported.
- */
-public class BeamSqlReinterpretExpression extends BeamSqlExpression {
-  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  @Override public boolean accept() {
-    return getOperands().size() == 1
-        && outputType == SqlTypeName.BIGINT
-        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
-    if (opType(0) == SqlTypeName.TIME) {
-      GregorianCalendar date = opValueEvaluated(0, inputRow, window);
-      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
-
-    } else {
-      Date date = opValueEvaluated(0, inputRow, window);
-      return BeamSqlPrimitive.of(outputType, date.getTime());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
new file mode 100644
index 0000000..6a5cbb1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+
+/**
+ * Infix '-' operation for timestamps.
+ *
+ * <p>Currently this implementation is specific to how Calcite implements 'TIMESTAMPDIFF(..)'.
+ * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
+ * with corresponding TimeUnit's multiplier.
+ *
+ * <p>In addition to this TIMESTAMPDIFF(..) implementation, Calcite also supports infix
+ * operations 'interval - interval' and 'timestamp - interval'.
+ * These are not implemented yet.
+ */
+public class BeamSqlDatetimeMinusExpression extends BeamSqlExpression {
+  private SqlTypeName intervalType;
+
+  private static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
+      ImmutableMap.<SqlTypeName, DurationFieldType>builder()
+      .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
+      .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
+      .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
+      .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
+      .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
+      .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
+      .build();
+
+  public BeamSqlDatetimeMinusExpression(
+      List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+    super(operands, SqlTypeName.BIGINT);
+    this.intervalType = intervalType;
+  }
+
+  /**
+   * Requires exactly 2 operands. One should be a timestamp, another an interval
+   */
+  @Override
+  public boolean accept() {
+    return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
+        && operands.size() == 2
+        && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+        && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+  }
+
+  /**
+   * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
+   * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
+   * multiplied by the corresponding TimeUnit.multiplier
+   */
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
+    DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+
+    long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
+    long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+
+    return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+  }
+
+  private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
+    Period period = new Period(timestampStart, timestampEnd,
+        PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
+    return period.get(durationFieldType(intervalType));
+  }
+
+  private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+    if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
+        throw new IllegalArgumentException("Counting "
+            + intervalTypeToCount.getName() + "s between dates is not supported");
+    }
+
+    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..b22fd09
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Reinterpret call.
+ *
+ * <p>Currently supported conversions:
+ *  - {@link SqlTypeName#DATETIME_TYPES} to {@code BIGINT};
+ *  - {@link SqlTypeName#INTEGER} to {@code BIGINT};
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+
+  private static final Reinterpreter REINTERPRETER = Reinterpreter.builder()
+      .withConversion(DatetimeReinterpretConversions.TIME_TO_BIGINT)
+      .withConversion(DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT)
+      .withConversion(IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT)
+      .build();
+
+  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return getOperands().size() == 1
+        && REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    return REINTERPRETER.convert(
+            SqlTypeName.BIGINT,
+            operands.get(0).evaluate(inputRow, window));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
new file mode 100644
index 0000000..e6b405b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import javax.annotation.Nonnull;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility class to contain implementations of datetime SQL type conversions.
+ */
+public abstract class DatetimeReinterpretConversions {
+
+  public static final ReinterpretConversion TIME_TO_BIGINT =
+      ReinterpretConversion.builder()
+          .from(SqlTypeName.TIME)
+          .to(SqlTypeName.BIGINT)
+          .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+            @Override
+            public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+              GregorianCalendar date = (GregorianCalendar) beamSqlPrimitive.getValue();
+              return BeamSqlPrimitive.of(SqlTypeName.BIGINT, date.getTimeInMillis());
+            }
+          }).build();
+
+  public static final ReinterpretConversion DATE_TYPES_TO_BIGINT =
+      ReinterpretConversion.builder()
+          .from(SqlTypeName.DATE, SqlTypeName.TIMESTAMP)
+          .to(SqlTypeName.BIGINT)
+          .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+            @Override
+            public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+              Date date = (Date) beamSqlPrimitive.getValue();
+              return BeamSqlPrimitive.of(SqlTypeName.BIGINT, date.getTime());
+            }
+          }).build();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
new file mode 100644
index 0000000..9539909
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+
+import javax.annotation.Nonnull;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility class to contain implementations of SQL integer type conversions.
+ */
+public abstract class IntegerReinterpretConversions {
+
+  public static final ReinterpretConversion INTEGER_TYPES_TO_BIGINT =
+      ReinterpretConversion.builder()
+          .from(SqlTypeName.INT_TYPES)
+          .to(SqlTypeName.BIGINT)
+          .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+            @Override
+            public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+              Long value = ((Number) beamSqlPrimitive.getValue()).longValue();
+              return BeamSqlPrimitive.of(SqlTypeName.BIGINT, value);
+            }
+          }).build();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
new file mode 100644
index 0000000..df29962
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Defines conversion between 2 SQL types.
+ */
+public class ReinterpretConversion {
+
+  /**
+   * Builder for {@link ReinterpretConversion}.
+   */
+  public static class Builder  {
+
+    private Set<SqlTypeName> from = new HashSet<>();
+    private SqlTypeName to;
+    private Function<BeamSqlPrimitive, BeamSqlPrimitive> convert;
+
+    public Builder from(SqlTypeName from) {
+      this.from.add(from);
+      return this;
+    }
+
+    public Builder from(Collection<SqlTypeName> from) {
+      this.from.addAll(from);
+      return this;
+    }
+
+    public Builder from(SqlTypeName ... from) {
+      return from(Arrays.asList(from));
+    }
+
+    public Builder to(SqlTypeName to) {
+      this.to = to;
+      return this;
+    }
+
+    public Builder convert(Function<BeamSqlPrimitive, BeamSqlPrimitive> convert) {
+      this.convert = convert;
+      return this;
+    }
+
+    public ReinterpretConversion build() {
+      if (from.isEmpty() || to == null || convert == null) {
+        throw new IllegalArgumentException("All arguments to ReinterpretConversion.Builder"
+            + " are mandatory.");
+      }
+      return new ReinterpretConversion(this);
+    }
+  }
+
+  private Set<SqlTypeName> from;
+  private SqlTypeName to;
+  private Function<BeamSqlPrimitive, BeamSqlPrimitive> convertFunction;
+
+  private ReinterpretConversion(Builder builder) {
+    this.from = ImmutableSet.copyOf(builder.from);
+    this.to = builder.to;
+    this.convertFunction = builder.convert;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public BeamSqlPrimitive convert(BeamSqlPrimitive input) {
+    if (!from.contains(input.getOutputType())) {
+      throw new IllegalArgumentException("Unable to convert from " + input.getOutputType().name()
+          + " to " + to.name() + ". This conversion only supports " + toString());
+    }
+
+    return convertFunction.apply(input);
+  }
+
+  public SqlTypeName to() {
+    return to;
+  }
+
+  public Set<SqlTypeName> from() {
+    return from;
+  }
+
+  @Override
+  public String toString() {
+    return from.toString() + "->" + to.name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
new file mode 100644
index 0000000..29d4ea4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Optional;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Class that tracks conversions between SQL types.
+ */
+public class Reinterpreter {
+
+  /**
+   * Builder for Reinterpreter.
+   */
+  public static class Builder {
+
+    private Map<SqlTypeName, Map<SqlTypeName, ReinterpretConversion>> conversions = new HashMap<>();
+
+    public Builder withConversion(ReinterpretConversion conversion) {
+      Set<SqlTypeName> fromTypes = conversion.from();
+      SqlTypeName toType = conversion.to();
+
+      for (SqlTypeName fromType : fromTypes) {
+        if (!conversions.containsKey(fromType)) {
+          conversions.put(fromType, new HashMap<SqlTypeName, ReinterpretConversion>());
+        }
+
+        conversions.get(fromType).put(toType, conversion);
+      }
+
+      return this;
+    }
+
+    public Reinterpreter build() {
+      if (conversions.isEmpty()) {
+        throw new IllegalArgumentException("Conversions should not be empty");
+      }
+
+      return new Reinterpreter(this);
+    }
+  }
+
+  private Map<SqlTypeName, Map<SqlTypeName, ReinterpretConversion>> conversions;
+
+  private Reinterpreter(Builder builder) {
+    this.conversions = builder.conversions;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public boolean canConvert(SqlTypeName from, SqlTypeName to) {
+    return getConversion(from, to).isPresent();
+  }
+
+  public BeamSqlPrimitive convert(SqlTypeName to, BeamSqlPrimitive value) {
+    Optional<ReinterpretConversion> conversion = getConversion(value.getOutputType(), to);
+    if (!conversion.isPresent()) {
+      throw new UnsupportedOperationException("Unsupported conversion: "
+          + value.getOutputType().name() + "->" + to.name());
+    }
+
+    return conversion.get().convert(value);
+  }
+
+  private Optional<ReinterpretConversion> getConversion(SqlTypeName from, SqlTypeName to) {
+    if (!conversions.containsKey(from)) {
+      return Optional.absent();
+    }
+
+    Map<SqlTypeName, ReinterpretConversion> allConversionsFrom = conversions.get(from);
+
+    ReinterpretConversion conversionTo = allConversionsFrom.get(to);
+
+    return Optional.fromNullable(conversionTo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
new file mode 100644
index 0000000..8694937
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for Reinterpret type conversions.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index c4583ec..382404e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
@@ -442,5 +443,18 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
     );
     exp = BeamSqlFnExecutor.buildExpression(rexNode);
     assertTrue(exp instanceof BeamSqlIntervalMultiplyExpression);
+
+    // minus for dates
+    rexNode = rexBuilder.makeCall(
+        TYPE_FACTORY.createSqlType(SqlTypeName.INTERVAL_DAY),
+        SqlStdOperatorTable.MINUS,
+        Arrays.<RexNode>asList(
+            rexBuilder.makeTimestampLiteral(Calendar.getInstance(), 1000),
+            rexBuilder.makeTimestampLiteral(Calendar.getInstance(), 1000)
+        )
+    );
+
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDatetimeMinusExpression);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
index e614fdf..3d7b8ad 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -22,11 +22,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.List;
+
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Test;
 
@@ -34,42 +37,97 @@ import org.junit.Test;
  * Test for {@code BeamSqlReinterpretExpression}.
  */
 public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase {
+  private static final long DATE_LONG = 1000L;
+  private static final Date DATE = new Date(DATE_LONG);
+  private static final GregorianCalendar CALENDAR = new GregorianCalendar(2017, 8, 9);
+
+  private static final BeamRecord NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  private static final BeamSqlExpression DATE_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.DATE, DATE);
+
+  private static final BeamSqlExpression TIME_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.TIME, CALENDAR);
+
+  private static final BeamSqlExpression TIMESTAMP_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.TIMESTAMP, DATE);
+
+  private static final BeamSqlExpression TINYINT_PRIMITIVE_5 = BeamSqlPrimitive.of(
+      SqlTypeName.TINYINT, (byte) 5);
+
+  private static final BeamSqlExpression SMALLINT_PRIMITIVE_6 = BeamSqlPrimitive.of(
+      SqlTypeName.SMALLINT, (short) 6);
+
+  private static final BeamSqlExpression INTEGER_PRIMITIVE_8 = BeamSqlPrimitive.of(
+      SqlTypeName.INTEGER, 8);
+
+  private static final BeamSqlExpression BIGINT_PRIMITIVE_15 = BeamSqlPrimitive.of(
+      SqlTypeName.BIGINT, 15L);
+
+  private static final BeamSqlExpression VARCHAR_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.VARCHAR, "hello");
 
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
+  @Test
+  public void testAcceptsDateTypes() throws Exception {
+    assertTrue(reinterpretExpression(DATE_PRIMITIVE).accept());
+    assertTrue(reinterpretExpression(TIMESTAMP_PRIMITIVE).accept());
+  }
+
+  @Test
+  public void testAcceptsTime() {
+    assertTrue(reinterpretExpression(TIME_PRIMITIVE).accept());
+  }
+
+  @Test
+  public void testAcceptsIntTypes() {
+    assertTrue(reinterpretExpression(TINYINT_PRIMITIVE_5).accept());
+    assertTrue(reinterpretExpression(SMALLINT_PRIMITIVE_6).accept());
+    assertTrue(reinterpretExpression(INTEGER_PRIMITIVE_8).accept());
+    assertTrue(reinterpretExpression(BIGINT_PRIMITIVE_15).accept());
+  }
 
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, new Date()));
-    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+  @Test
+  public void testDoesNotAcceptUnsupportedType() {
+    assertFalse(reinterpretExpression(VARCHAR_PRIMITIVE).accept());
+  }
 
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date()));
-    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+  @Test
+  public void testHasCorrectOutputType() {
+    BeamSqlReinterpretExpression reinterpretExpression1 =
+        new BeamSqlReinterpretExpression(Arrays.asList(DATE_PRIMITIVE), SqlTypeName.BIGINT);
+    assertEquals(SqlTypeName.BIGINT, reinterpretExpression1.getOutputType());
 
-    operands.clear();
-    GregorianCalendar calendar = new GregorianCalendar();
-    calendar.setTime(new Date());
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
-    assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+    BeamSqlReinterpretExpression reinterpretExpression2 =
+        new BeamSqlReinterpretExpression(Arrays.asList(DATE_PRIMITIVE), SqlTypeName.INTERVAL_YEAR);
+    assertEquals(SqlTypeName.INTERVAL_YEAR, reinterpretExpression2.getOutputType());
+  }
 
-    // currently only support reinterpret DATE
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+  @Test
+  public void evaluateDate() {
+    assertEquals(DATE_LONG, evaluateReinterpretExpression(DATE_PRIMITIVE));
+    assertEquals(DATE_LONG, evaluateReinterpretExpression(TIMESTAMP_PRIMITIVE));
+  }
 
-    // currently only support convert to BIGINT
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
-    assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.TINYINT).accept());
+  @Test
+  public void evaluateTime() {
+    assertEquals(CALENDAR.getTimeInMillis(), evaluateReinterpretExpression(TIME_PRIMITIVE));
   }
 
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
+  @Test
+  public void evaluateInts() {
+    assertEquals(5L, evaluateReinterpretExpression(TINYINT_PRIMITIVE_5));
+    assertEquals(6L, evaluateReinterpretExpression(SMALLINT_PRIMITIVE_6));
+    assertEquals(8L, evaluateReinterpretExpression(INTEGER_PRIMITIVE_8));
+    assertEquals(15L, evaluateReinterpretExpression(BIGINT_PRIMITIVE_15));
+  }
 
-    Date d = new Date();
-    d.setTime(1000);
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
-    assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
-        .evaluate(record, null).getValue());
+  private static long evaluateReinterpretExpression(BeamSqlExpression operand) {
+    return reinterpretExpression(operand).evaluate(NULL_ROW, NULL_WINDOW).getLong();
   }
 
+  private static BeamSqlReinterpretExpression reinterpretExpression(
+      BeamSqlExpression... operands) {
+    return new BeamSqlReinterpretExpression(Arrays.asList(operands), SqlTypeName.BIGINT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
new file mode 100644
index 0000000..e0fe166
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlDatetimeMinusExpression}.
+ */
+public class BeamSqlDatetimeMinusExpressionTest {
+  private static final BeamRecord NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+  private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+  private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
+  private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
+  private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
+  private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
+  private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testOutputTypeIsBigint() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
+  }
+
+  @Test public void testAccepts2Timestamps() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertTrue(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAccept3Timestamps() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE_MINUS_1_YEAR),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAccept1Timestamp() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY_MINUTE,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+            timestamp(DATE));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+
+    assertFalse(minusExpression.accept());
+  }
+
+  @Test public void testEvaluateDiffSeconds() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_SECOND,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_2_SEC));
+
+    long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffMinutes() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_MINUTE,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_3_MIN));
+
+    long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffHours() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_HOUR,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_4_HOURS));
+
+    long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffDays() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_7_DAYS));
+
+    long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffMonths() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_MONTH,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_2_MONTHS));
+
+    long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateDiffYears() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_YEAR,
+            timestamp(DATE),
+            timestamp(DATE_MINUS_1_YEAR));
+
+    long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateNegativeDiffSeconds() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_SECOND,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
+    assertEquals(expectedResult, eval(minusExpression));
+  }
+
+  @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+
+    thrown.expect(IllegalArgumentException.class);
+
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(
+            SqlTypeName.INTERVAL_DAY_MINUTE,
+            timestamp(DATE_MINUS_2_SEC),
+            timestamp(DATE));
+
+    eval(minusExpression);
+  }
+
+  private static BeamSqlDatetimeMinusExpression minusExpression(
+      SqlTypeName intervalsToCount, BeamSqlExpression ... operands) {
+    return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), intervalsToCount);
+  }
+
+  private BeamSqlExpression timestamp(Date date) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
+  }
+
+  private long eval(BeamSqlDatetimeMinusExpression minusExpression) {
+    return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
+  }
+
+  private long applyMultiplier(long value, TimeUnit timeUnit) {
+    return value * timeUnit.multiplier.longValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
new file mode 100644
index 0000000..894d094
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DatetimeReinterpretConversions}.
+ */
+public class DatetimeReinterpretConversionsTest {
+  private static final long DATE_LONG = 1000L;
+  private static final Date DATE = new Date(DATE_LONG);
+  private static final GregorianCalendar CALENDAR = new GregorianCalendar(2017, 8, 9);
+
+  private static final BeamSqlPrimitive DATE_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.DATE, DATE);
+
+  private static final BeamSqlPrimitive TIME_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.TIME, CALENDAR);
+
+  private static final BeamSqlPrimitive TIMESTAMP_PRIMITIVE = BeamSqlPrimitive.of(
+      SqlTypeName.TIMESTAMP, DATE);
+
+  @Test public void testTimeToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        DatetimeReinterpretConversions.TIME_TO_BIGINT
+          .convert(TIME_PRIMITIVE);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(CALENDAR.getTimeInMillis(), conversionResultPrimitive.getLong());
+  }
+
+  @Test public void testDateToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT
+            .convert(DATE_PRIMITIVE);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(DATE_LONG, conversionResultPrimitive.getLong());
+  }
+
+  @Test public void testTimestampToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT
+            .convert(TIMESTAMP_PRIMITIVE);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(DATE_LONG, conversionResultPrimitive.getLong());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
new file mode 100644
index 0000000..2bf14f6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+
+/**
+ * Unit tests for {@link IntegerReinterpretConversions}.
+ */
+
+public class IntegerReinterpretConversionsTest {
+
+  private static final BeamSqlPrimitive TINYINT_PRIMITIVE_5 = BeamSqlPrimitive.of(
+      SqlTypeName.TINYINT, (byte) 5);
+
+  private static final BeamSqlPrimitive SMALLINT_PRIMITIVE_6 = BeamSqlPrimitive.of(
+      SqlTypeName.SMALLINT, (short) 6);
+
+  private static final BeamSqlPrimitive INTEGER_PRIMITIVE_8 = BeamSqlPrimitive.of(
+      SqlTypeName.INTEGER, 8);
+
+  private static final BeamSqlPrimitive BIGINT_PRIMITIVE_15 = BeamSqlPrimitive.of(
+      SqlTypeName.BIGINT, 15L);
+
+  @Test public void testTinyIntToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+            .convert(TINYINT_PRIMITIVE_5);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(5L, conversionResultPrimitive.getLong());
+  }
+
+  @Test public void testSmallIntToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+            .convert(SMALLINT_PRIMITIVE_6);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(6L, conversionResultPrimitive.getLong());
+  }
+
+  @Test public void testIntegerToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+            .convert(INTEGER_PRIMITIVE_8);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(8L, conversionResultPrimitive.getLong());
+  }
+
+  @Test public void testBigintToBigint() {
+    BeamSqlPrimitive conversionResultPrimitive =
+        IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+            .convert(BIGINT_PRIMITIVE_15);
+
+    assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+    assertEquals(15L, conversionResultPrimitive.getLong());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
new file mode 100644
index 0000000..31cdab8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+/**
+ * Unit test for {@link ReinterpretConversion}.
+ */
+public class ReinterpretConversionTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testNewInstanceProperties() {
+    Set<SqlTypeName> from = ImmutableSet.of(SqlTypeName.FLOAT, SqlTypeName.TIME);
+    SqlTypeName to = SqlTypeName.BOOLEAN;
+    Function<BeamSqlPrimitive, BeamSqlPrimitive> mockConversionFunction = mock(Function.class);
+
+    ReinterpretConversion conversion = ReinterpretConversion.builder()
+        .from(from)
+        .to(to)
+        .convert(mockConversionFunction)
+        .build();
+
+    assertEquals(from, conversion.from());
+    assertEquals(to, conversion.to());
+  }
+
+  @Test public void testConvert() {
+    BeamSqlPrimitive integerPrimitive = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3);
+    BeamSqlPrimitive booleanPrimitive = BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true);
+
+    Function<BeamSqlPrimitive, BeamSqlPrimitive> mockConversionFunction = mock(Function.class);
+    doReturn(booleanPrimitive).when(mockConversionFunction).apply(same(integerPrimitive));
+
+    ReinterpretConversion conversion = ReinterpretConversion.builder()
+        .from(SqlTypeName.INTEGER)
+        .to(SqlTypeName.BOOLEAN)
+        .convert(mockConversionFunction)
+        .build();
+
+    BeamSqlPrimitive conversionResult = conversion.convert(integerPrimitive);
+
+    assertSame(booleanPrimitive, conversionResult);
+    verify(mockConversionFunction).apply(same(integerPrimitive));
+  }
+
+  @Test public void testBuilderThrowsWithoutFrom() {
+    thrown.expect(IllegalArgumentException.class);
+    ReinterpretConversion.builder()
+        .to(SqlTypeName.BOOLEAN)
+        .convert(mock(Function.class))
+        .build();
+  }
+
+  @Test public void testBuilderThrowsWihtoutTo() {
+    thrown.expect(IllegalArgumentException.class);
+    ReinterpretConversion.builder()
+        .from(SqlTypeName.BOOLEAN)
+        .convert(mock(Function.class))
+        .build();
+  }
+
+  @Test public void testBuilderThrowsWihtoutConversionFunction() {
+    thrown.expect(IllegalArgumentException.class);
+    ReinterpretConversion.builder()
+        .from(SqlTypeName.BOOLEAN)
+        .to(SqlTypeName.SMALLINT)
+        .build();
+  }
+
+  @Test public void testConvertThrowsForUnsupportedInput() {
+    thrown.expect(IllegalArgumentException.class);
+
+    ReinterpretConversion conversion = ReinterpretConversion.builder()
+        .from(SqlTypeName.DATE)
+        .to(SqlTypeName.BOOLEAN)
+        .convert(mock(Function.class))
+        .build();
+
+    conversion.convert(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
new file mode 100644
index 0000000..6406831
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
@@ -0,0 +1,155 @@
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.internal.util.collections.Sets;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit tests for {@link Reinterpreter}.
+ */
+public class ReinterpreterTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test public void testBuilderCreatesInstance() {
+    Reinterpreter reinterpreter = newReinterpreter();
+    assertNotNull(reinterpreter);
+  }
+
+  @Test public void testBuilderThrowsWithoutConverters() {
+    thrown.expect(IllegalArgumentException.class);
+    Reinterpreter.builder().build();
+  }
+
+  @Test public void testCanConvertBetweenSupportedTypes() {
+    Reinterpreter reinterpreter = Reinterpreter.builder()
+        .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+        .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+        .build();
+
+    assertTrue(reinterpreter.canConvert(SqlTypeName.SMALLINT, SqlTypeName.SYMBOL));
+    assertTrue(reinterpreter.canConvert(SqlTypeName.DATE, SqlTypeName.SYMBOL));
+    assertTrue(reinterpreter.canConvert(SqlTypeName.FLOAT, SqlTypeName.INTEGER));
+  }
+
+  @Test public void testCannotConvertFromUnsupportedTypes() {
+    Reinterpreter reinterpreter = Reinterpreter.builder()
+        .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+        .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+        .build();
+
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.removeAll(
+          Sets.newSet(SqlTypeName.DATE, SqlTypeName.SMALLINT, SqlTypeName.FLOAT));
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      assertFalse(reinterpreter.canConvert(unsupportedType, SqlTypeName.DATE));
+      assertFalse(reinterpreter.canConvert(unsupportedType, SqlTypeName.INTEGER));
+    }
+  }
+
+  @Test public void testCannotConvertToUnsupportedTypes() {
+    Reinterpreter reinterpreter = Reinterpreter.builder()
+        .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+        .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+        .build();
+
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.removeAll(Sets.newSet(SqlTypeName.SYMBOL, SqlTypeName.INTEGER));
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      assertFalse(reinterpreter.canConvert(SqlTypeName.SMALLINT, unsupportedType));
+      assertFalse(reinterpreter.canConvert(SqlTypeName.DATE, unsupportedType));
+      assertFalse(reinterpreter.canConvert(SqlTypeName.FLOAT, unsupportedType));
+    }
+  }
+
+  @Test public void testConvert() {
+    Date date = new Date(12345L);
+    BeamSqlPrimitive stringPrimitive = BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello");
+    BeamSqlPrimitive datePrimitive = BeamSqlPrimitive.of(SqlTypeName.DATE, date);
+
+    ReinterpretConversion mockConversion = mock(ReinterpretConversion.class);
+    doReturn(Sets.newSet(SqlTypeName.VARCHAR)).when(mockConversion).from();
+    doReturn(SqlTypeName.DATE).when(mockConversion).to();
+    doReturn(datePrimitive).when(mockConversion).convert(same(stringPrimitive));
+
+    Reinterpreter reinterpreter = Reinterpreter.builder().withConversion(mockConversion).build();
+    BeamSqlPrimitive converted = reinterpreter.convert(SqlTypeName.DATE, stringPrimitive);
+
+    assertSame(datePrimitive, converted);
+    verify(mockConversion).convert(same(stringPrimitive));
+  }
+
+  @Test public void testConvertThrowsForUnsupportedFromType() {
+    thrown.expect(UnsupportedOperationException.class);
+
+    BeamSqlPrimitive intervalPrimitive = BeamSqlPrimitive
+        .of(SqlTypeName.INTERVAL_DAY, new BigDecimal(2));
+
+    Reinterpreter reinterpreter = newReinterpreter();
+    reinterpreter.convert(SqlTypeName.DATE, intervalPrimitive);
+  }
+
+  @Test public void testConvertThrowsForUnsupportedToType() {
+    thrown.expect(UnsupportedOperationException.class);
+
+    BeamSqlPrimitive stringPrimitive = BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello");
+
+    Reinterpreter reinterpreter = newReinterpreter();
+    reinterpreter.convert(SqlTypeName.INTERVAL_DAY, stringPrimitive);
+  }
+
+  private Reinterpreter newReinterpreter() {
+    return Reinterpreter.builder()
+        .withConversion(
+            mockConversion(
+                SqlTypeName.DATE,
+                SqlTypeName.SMALLINT, SqlTypeName.VARCHAR))
+        .build();
+  }
+
+  private ReinterpretConversion mockConversion(SqlTypeName convertTo, SqlTypeName ... convertFrom) {
+    ReinterpretConversion conversion = mock(ReinterpretConversion.class);
+
+    doReturn(Sets.newSet(convertFrom)).when(conversion).from();
+    doReturn(convertTo).when(conversion).to();
+
+    return conversion;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 6937a18..ba901d3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -91,6 +91,58 @@ public class BeamSqlDateFunctionsIntegrationTest
     checker.buildRunAndCheck();
   }
 
+  @Test public void testTimestampDiff() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:01:58')", 0)
+        .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:01:59')", 1)
+        .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:02:00')", 2)
+
+        .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:02:57')", 0)
+        .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:02:58')", 1)
+        .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 01:03:58')", 2)
+
+        .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 02:01:57')", 0)
+        .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 02:01:58')", 1)
+        .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-19 03:01:58')", 2)
+
+        .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-20 01:01:57')", 0)
+        .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-20 01:01:58')", 1)
+        .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+            + "TIMESTAMP '1984-04-21 01:01:58')", 2)
+
+        .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+            + "TIMESTAMP '1984-02-19 01:01:57')", 0)
+        .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+            + "TIMESTAMP '1984-02-19 01:01:58')", 1)
+        .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+            + "TIMESTAMP '1984-03-19 01:01:58')", 2)
+
+        .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+            + "TIMESTAMP '1982-01-19 01:01:57')", 0)
+        .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+            + "TIMESTAMP '1982-01-19 01:01:58')", 1)
+        .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+            + "TIMESTAMP '1983-01-19 01:01:58')", 2)
+
+        .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+            + "TIMESTAMP '1980-01-19 01:01:58')", -1)
+        .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+            + "TIMESTAMP '1979-01-19 01:01:58')", -2)
+    ;
+    checker.buildRunAndCheck();
+  }
+
   @Test public void testDateTimeFunctions_currentTime() throws Exception {
     String sql = "SELECT "
         + "LOCALTIME as l,"


[2/2] beam git commit: This closes #4065

Posted by xu...@apache.org.
This closes #4065


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f80bf2fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f80bf2fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f80bf2fd

Branch: refs/heads/master
Commit: f80bf2fdff655b62deb458b1e57d867bfbf03f41
Parents: 269bf89 4fe7480
Author: James Xu <xu...@gmail.com>
Authored: Tue Nov 7 23:11:03 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Tue Nov 7 23:11:03 2017 +0800

----------------------------------------------------------------------
 sdks/java/extensions/sql/pom.xml                |   8 +
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |   9 +-
 .../operator/BeamSqlReinterpretExpression.java  |  55 -----
 .../date/BeamSqlDatetimeMinusExpression.java    | 107 +++++++++
 .../BeamSqlReinterpretExpression.java           |  58 +++++
 .../DatetimeReinterpretConversions.java         |  59 +++++
 .../IntegerReinterpretConversions.java          |  44 ++++
 .../reinterpret/ReinterpretConversion.java      | 114 +++++++++
 .../operator/reinterpret/Reinterpreter.java     | 101 ++++++++
 .../operator/reinterpret/package-info.java      |  22 ++
 .../impl/interpreter/BeamSqlFnExecutorTest.java |  14 ++
 .../BeamSqlReinterpretExpressionTest.java       | 116 +++++++---
 .../BeamSqlDatetimeMinusExpressionTest.java     | 232 +++++++++++++++++++
 .../DatetimeReinterpretConversionsTest.java     |  73 ++++++
 .../IntegerReinterpretConversionsTest.java      |  81 +++++++
 .../reinterpret/ReinterpretConversionTest.java  | 116 ++++++++++
 .../operator/reinterpret/ReinterpreterTest.java | 155 +++++++++++++
 .../BeamSqlDateFunctionsIntegrationTest.java    |  52 +++++
 18 files changed, 1330 insertions(+), 86 deletions(-)
----------------------------------------------------------------------