You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/07 15:54:52 UTC
[1/2] beam git commit: [BEAM-2203] Implement TIMESTAMPDIFF()
Repository: beam
Updated Branches:
refs/heads/master 269bf8946 -> f80bf2fdf
[BEAM-2203] Implement TIMESTAMPDIFF()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fe7480a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fe7480a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fe7480a
Branch: refs/heads/master
Commit: 4fe7480a749699e0d1418dbe0b29e6ecefd08497
Parents: 269bf89
Author: Anton Kedin <ke...@kedin-macbookpro.roam.corp.google.com>
Authored: Mon Oct 30 17:36:21 2017 -0700
Committer: James Xu <xu...@gmail.com>
Committed: Tue Nov 7 23:09:20 2017 +0800
----------------------------------------------------------------------
sdks/java/extensions/sql/pom.xml | 8 +
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 9 +-
.../operator/BeamSqlReinterpretExpression.java | 55 -----
.../date/BeamSqlDatetimeMinusExpression.java | 107 +++++++++
.../BeamSqlReinterpretExpression.java | 58 +++++
.../DatetimeReinterpretConversions.java | 59 +++++
.../IntegerReinterpretConversions.java | 44 ++++
.../reinterpret/ReinterpretConversion.java | 114 +++++++++
.../operator/reinterpret/Reinterpreter.java | 101 ++++++++
.../operator/reinterpret/package-info.java | 22 ++
.../impl/interpreter/BeamSqlFnExecutorTest.java | 14 ++
.../BeamSqlReinterpretExpressionTest.java | 116 +++++++---
.../BeamSqlDatetimeMinusExpressionTest.java | 232 +++++++++++++++++++
.../DatetimeReinterpretConversionsTest.java | 73 ++++++
.../IntegerReinterpretConversionsTest.java | 81 +++++++
.../reinterpret/ReinterpretConversionTest.java | 116 ++++++++++
.../operator/reinterpret/ReinterpreterTest.java | 155 +++++++++++++
.../BeamSqlDateFunctionsIntegrationTest.java | 52 +++++
18 files changed, 1330 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index 742d3b6..34ff670 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -37,6 +37,7 @@
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
<calcite.version>1.13.0</calcite.version>
<avatica.version>1.10.0</avatica.version>
+ <mockito.version>1.9.5</mockito.version>
</properties>
<profiles>
@@ -271,5 +272,12 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 8770055..31d5022 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastE
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlReinterpretExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
@@ -50,6 +49,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSql
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
@@ -235,7 +236,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
ret = new BeamSqlPlusExpression(subExps);
break;
case "-":
- ret = new BeamSqlMinusExpression(subExps);
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ ret = new BeamSqlMinusExpression(subExps);
+ } else {
+ ret = new BeamSqlDatetimeMinusExpression(subExps, node.type.getSqlTypeName());
+ }
break;
case "*":
if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
deleted file mode 100644
index 2ec4fb5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for REINTERPRET.
- *
- * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
- * to {@code BIGINT} is supported.
- */
-public class BeamSqlReinterpretExpression extends BeamSqlExpression {
- public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
- super(operands, outputType);
- }
-
- @Override public boolean accept() {
- return getOperands().size() == 1
- && outputType == SqlTypeName.BIGINT
- && SqlTypeName.DATETIME_TYPES.contains(opType(0));
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
- if (opType(0) == SqlTypeName.TIME) {
- GregorianCalendar date = opValueEvaluated(0, inputRow, window);
- return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
-
- } else {
- Date date = opValueEvaluated(0, inputRow, window);
- return BeamSqlPrimitive.of(outputType, date.getTime());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
new file mode 100644
index 0000000..6a5cbb1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+
+/**
+ * Infix '-' operation for timestamps.
+ *
+ * <p>Currently this implementation is specific to how Calcite implements 'TIMESTAMPDIFF(..)'.
+ * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
+ * with corresponding TimeUnit's multiplier.
+ *
+ * <p>In addition to this TIMESTAMPDIFF(..) implementation, Calcite also supports infix
+ * operations 'interval - interval' and 'timestamp - interval'.
+ * These are not implemented yet.
+ */
+public class BeamSqlDatetimeMinusExpression extends BeamSqlExpression {
+ private SqlTypeName intervalType;
+
+ private static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
+ ImmutableMap.<SqlTypeName, DurationFieldType>builder()
+ .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
+ .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
+ .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
+ .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
+ .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
+ .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
+ .build();
+
+ public BeamSqlDatetimeMinusExpression(
+ List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+ super(operands, SqlTypeName.BIGINT);
+ this.intervalType = intervalType;
+ }
+
+ /**
+ * Requires exactly 2 operands. One should be a timestamp, another an interval
+ */
+ @Override
+ public boolean accept() {
+ return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
+ && operands.size() == 2
+ && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+ && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+ }
+
+ /**
+ * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
+ * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
+ * multiplied by the corresponding TimeUnit.multiplier
+ */
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
+ DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+
+ long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
+ long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+
+ return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+ }
+
+ private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
+ Period period = new Period(timestampStart, timestampEnd,
+ PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
+ return period.get(durationFieldType(intervalType));
+ }
+
+ private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+ if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
+ throw new IllegalArgumentException("Counting "
+ + intervalTypeToCount.getName() + "s between dates is not supported");
+ }
+
+ return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..b22fd09
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Reinterpret call.
+ *
+ * <p>Currently supported conversions:
+ * - {@link SqlTypeName#DATETIME_TYPES} to {@code BIGINT};
+ * - {@link SqlTypeName#INTEGER} to {@code BIGINT};
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+
+ private static final Reinterpreter REINTERPRETER = Reinterpreter.builder()
+ .withConversion(DatetimeReinterpretConversions.TIME_TO_BIGINT)
+ .withConversion(DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT)
+ .withConversion(IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT)
+ .build();
+
+ public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ return getOperands().size() == 1
+ && REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ return REINTERPRETER.convert(
+ SqlTypeName.BIGINT,
+ operands.get(0).evaluate(inputRow, window));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
new file mode 100644
index 0000000..e6b405b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import javax.annotation.Nonnull;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility class to contain implementations of datetime SQL type conversions.
+ */
+public abstract class DatetimeReinterpretConversions {
+
+ public static final ReinterpretConversion TIME_TO_BIGINT =
+ ReinterpretConversion.builder()
+ .from(SqlTypeName.TIME)
+ .to(SqlTypeName.BIGINT)
+ .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+ @Override
+ public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+ GregorianCalendar date = (GregorianCalendar) beamSqlPrimitive.getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BIGINT, date.getTimeInMillis());
+ }
+ }).build();
+
+ public static final ReinterpretConversion DATE_TYPES_TO_BIGINT =
+ ReinterpretConversion.builder()
+ .from(SqlTypeName.DATE, SqlTypeName.TIMESTAMP)
+ .to(SqlTypeName.BIGINT)
+ .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+ @Override
+ public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+ Date date = (Date) beamSqlPrimitive.getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BIGINT, date.getTime());
+ }
+ }).build();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
new file mode 100644
index 0000000..9539909
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+
+import javax.annotation.Nonnull;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility class to contain implementations of SQL integer type conversions.
+ */
+public abstract class IntegerReinterpretConversions {
+
+ public static final ReinterpretConversion INTEGER_TYPES_TO_BIGINT =
+ ReinterpretConversion.builder()
+ .from(SqlTypeName.INT_TYPES)
+ .to(SqlTypeName.BIGINT)
+ .convert(new Function<BeamSqlPrimitive, BeamSqlPrimitive>() {
+ @Override
+ public BeamSqlPrimitive apply(@Nonnull BeamSqlPrimitive beamSqlPrimitive) {
+ Long value = ((Number) beamSqlPrimitive.getValue()).longValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BIGINT, value);
+ }
+ }).build();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
new file mode 100644
index 0000000..df29962
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversion.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Defines conversion between 2 SQL types.
+ */
+public class ReinterpretConversion {
+
+ /**
+ * Builder for {@link ReinterpretConversion}.
+ */
+ public static class Builder {
+
+ private Set<SqlTypeName> from = new HashSet<>();
+ private SqlTypeName to;
+ private Function<BeamSqlPrimitive, BeamSqlPrimitive> convert;
+
+ public Builder from(SqlTypeName from) {
+ this.from.add(from);
+ return this;
+ }
+
+ public Builder from(Collection<SqlTypeName> from) {
+ this.from.addAll(from);
+ return this;
+ }
+
+ public Builder from(SqlTypeName ... from) {
+ return from(Arrays.asList(from));
+ }
+
+ public Builder to(SqlTypeName to) {
+ this.to = to;
+ return this;
+ }
+
+ public Builder convert(Function<BeamSqlPrimitive, BeamSqlPrimitive> convert) {
+ this.convert = convert;
+ return this;
+ }
+
+ public ReinterpretConversion build() {
+ if (from.isEmpty() || to == null || convert == null) {
+ throw new IllegalArgumentException("All arguments to ReinterpretConversion.Builder"
+ + " are mandatory.");
+ }
+ return new ReinterpretConversion(this);
+ }
+ }
+
+ private Set<SqlTypeName> from;
+ private SqlTypeName to;
+ private Function<BeamSqlPrimitive, BeamSqlPrimitive> convertFunction;
+
+ private ReinterpretConversion(Builder builder) {
+ this.from = ImmutableSet.copyOf(builder.from);
+ this.to = builder.to;
+ this.convertFunction = builder.convert;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public BeamSqlPrimitive convert(BeamSqlPrimitive input) {
+ if (!from.contains(input.getOutputType())) {
+ throw new IllegalArgumentException("Unable to convert from " + input.getOutputType().name()
+ + " to " + to.name() + ". This conversion only supports " + toString());
+ }
+
+ return convertFunction.apply(input);
+ }
+
+ public SqlTypeName to() {
+ return to;
+ }
+
+ public Set<SqlTypeName> from() {
+ return from;
+ }
+
+ @Override
+ public String toString() {
+ return from.toString() + "->" + to.name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
new file mode 100644
index 0000000..29d4ea4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/Reinterpreter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import com.google.common.base.Optional;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Class that tracks conversions between SQL types.
+ */
+public class Reinterpreter {
+
+ /**
+ * Builder for Reinterpreter.
+ */
+ public static class Builder {
+
+ private Map<SqlTypeName, Map<SqlTypeName, ReinterpretConversion>> conversions = new HashMap<>();
+
+ public Builder withConversion(ReinterpretConversion conversion) {
+ Set<SqlTypeName> fromTypes = conversion.from();
+ SqlTypeName toType = conversion.to();
+
+ for (SqlTypeName fromType : fromTypes) {
+ if (!conversions.containsKey(fromType)) {
+ conversions.put(fromType, new HashMap<SqlTypeName, ReinterpretConversion>());
+ }
+
+ conversions.get(fromType).put(toType, conversion);
+ }
+
+ return this;
+ }
+
+ public Reinterpreter build() {
+ if (conversions.isEmpty()) {
+ throw new IllegalArgumentException("Conversions should not be empty");
+ }
+
+ return new Reinterpreter(this);
+ }
+ }
+
+ private Map<SqlTypeName, Map<SqlTypeName, ReinterpretConversion>> conversions;
+
+ private Reinterpreter(Builder builder) {
+ this.conversions = builder.conversions;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public boolean canConvert(SqlTypeName from, SqlTypeName to) {
+ return getConversion(from, to).isPresent();
+ }
+
+ public BeamSqlPrimitive convert(SqlTypeName to, BeamSqlPrimitive value) {
+ Optional<ReinterpretConversion> conversion = getConversion(value.getOutputType(), to);
+ if (!conversion.isPresent()) {
+ throw new UnsupportedOperationException("Unsupported conversion: "
+ + value.getOutputType().name() + "->" + to.name());
+ }
+
+ return conversion.get().convert(value);
+ }
+
+ private Optional<ReinterpretConversion> getConversion(SqlTypeName from, SqlTypeName to) {
+ if (!conversions.containsKey(from)) {
+ return Optional.absent();
+ }
+
+ Map<SqlTypeName, ReinterpretConversion> allConversionsFrom = conversions.get(from);
+
+ ReinterpretConversion conversionTo = allConversionsFrom.get(to);
+
+ return Optional.fromNullable(conversionTo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
new file mode 100644
index 0000000..8694937
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for Reinterpret type conversions.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index c4583ec..382404e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
@@ -442,5 +443,18 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
);
exp = BeamSqlFnExecutor.buildExpression(rexNode);
assertTrue(exp instanceof BeamSqlIntervalMultiplyExpression);
+
+ // minus for dates
+ rexNode = rexBuilder.makeCall(
+ TYPE_FACTORY.createSqlType(SqlTypeName.INTERVAL_DAY),
+ SqlStdOperatorTable.MINUS,
+ Arrays.<RexNode>asList(
+ rexBuilder.makeTimestampLiteral(Calendar.getInstance(), 1000),
+ rexBuilder.makeTimestampLiteral(Calendar.getInstance(), 1000)
+ )
+ );
+
+ exp = BeamSqlFnExecutor.buildExpression(rexNode);
+ assertTrue(exp instanceof BeamSqlDatetimeMinusExpression);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
index e614fdf..3d7b8ad 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -22,11 +22,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
-import java.util.List;
+
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Test;
@@ -34,42 +37,97 @@ import org.junit.Test;
* Test for {@code BeamSqlReinterpretExpression}.
*/
public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase {
+ private static final long DATE_LONG = 1000L;
+ private static final Date DATE = new Date(DATE_LONG);
+ private static final GregorianCalendar CALENDAR = new GregorianCalendar(2017, 8, 9);
+
+ private static final BeamRecord NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ private static final BeamSqlExpression DATE_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.DATE, DATE);
+
+ private static final BeamSqlExpression TIME_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.TIME, CALENDAR);
+
+ private static final BeamSqlExpression TIMESTAMP_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.TIMESTAMP, DATE);
+
+ private static final BeamSqlExpression TINYINT_PRIMITIVE_5 = BeamSqlPrimitive.of(
+ SqlTypeName.TINYINT, (byte) 5);
+
+ private static final BeamSqlExpression SMALLINT_PRIMITIVE_6 = BeamSqlPrimitive.of(
+ SqlTypeName.SMALLINT, (short) 6);
+
+ private static final BeamSqlExpression INTEGER_PRIMITIVE_8 = BeamSqlPrimitive.of(
+ SqlTypeName.INTEGER, 8);
+
+ private static final BeamSqlExpression BIGINT_PRIMITIVE_15 = BeamSqlPrimitive.of(
+ SqlTypeName.BIGINT, 15L);
+
+ private static final BeamSqlExpression VARCHAR_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR, "hello");
- @Test public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
+ @Test
+ public void testAcceptsDateTypes() throws Exception {
+ assertTrue(reinterpretExpression(DATE_PRIMITIVE).accept());
+ assertTrue(reinterpretExpression(TIMESTAMP_PRIMITIVE).accept());
+ }
+
+ @Test
+ public void testAcceptsTime() {
+ assertTrue(reinterpretExpression(TIME_PRIMITIVE).accept());
+ }
+
+ @Test
+ public void testAcceptsIntTypes() {
+ assertTrue(reinterpretExpression(TINYINT_PRIMITIVE_5).accept());
+ assertTrue(reinterpretExpression(SMALLINT_PRIMITIVE_6).accept());
+ assertTrue(reinterpretExpression(INTEGER_PRIMITIVE_8).accept());
+ assertTrue(reinterpretExpression(BIGINT_PRIMITIVE_15).accept());
+ }
- operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, new Date()));
- assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+ @Test
+ public void testDoesNotAcceptUnsupportedType() {
+ assertFalse(reinterpretExpression(VARCHAR_PRIMITIVE).accept());
+ }
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date()));
- assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+ @Test
+ public void testHasCorrectOutputType() {
+ BeamSqlReinterpretExpression reinterpretExpression1 =
+ new BeamSqlReinterpretExpression(Arrays.asList(DATE_PRIMITIVE), SqlTypeName.BIGINT);
+ assertEquals(SqlTypeName.BIGINT, reinterpretExpression1.getOutputType());
- operands.clear();
- GregorianCalendar calendar = new GregorianCalendar();
- calendar.setTime(new Date());
- operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
- assertTrue(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+ BeamSqlReinterpretExpression reinterpretExpression2 =
+ new BeamSqlReinterpretExpression(Arrays.asList(DATE_PRIMITIVE), SqlTypeName.INTERVAL_YEAR);
+ assertEquals(SqlTypeName.INTERVAL_YEAR, reinterpretExpression2.getOutputType());
+ }
- // currently only support reinterpret DATE
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT).accept());
+ @Test
+ public void evaluateDate() {
+ assertEquals(DATE_LONG, evaluateReinterpretExpression(DATE_PRIMITIVE));
+ assertEquals(DATE_LONG, evaluateReinterpretExpression(TIMESTAMP_PRIMITIVE));
+ }
- // currently only support convert to BIGINT
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
- assertFalse(new BeamSqlReinterpretExpression(operands, SqlTypeName.TINYINT).accept());
+ @Test
+ public void evaluateTime() {
+ assertEquals(CALENDAR.getTimeInMillis(), evaluateReinterpretExpression(TIME_PRIMITIVE));
}
- @Test public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
+ @Test
+ public void evaluateInts() {
+ assertEquals(5L, evaluateReinterpretExpression(TINYINT_PRIMITIVE_5));
+ assertEquals(6L, evaluateReinterpretExpression(SMALLINT_PRIMITIVE_6));
+ assertEquals(8L, evaluateReinterpretExpression(INTEGER_PRIMITIVE_8));
+ assertEquals(15L, evaluateReinterpretExpression(BIGINT_PRIMITIVE_15));
+ }
- Date d = new Date();
- d.setTime(1000);
- operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
- assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
- .evaluate(record, null).getValue());
+ private static long evaluateReinterpretExpression(BeamSqlExpression operand) {
+ return reinterpretExpression(operand).evaluate(NULL_ROW, NULL_WINDOW).getLong();
}
+ private static BeamSqlReinterpretExpression reinterpretExpression(
+ BeamSqlExpression... operands) {
+ return new BeamSqlReinterpretExpression(Arrays.asList(operands), SqlTypeName.BIGINT);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
new file mode 100644
index 0000000..e0fe166
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlDatetimeMinusExpression}.
+ */
+public class BeamSqlDatetimeMinusExpressionTest {
+ private static final BeamRecord NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+ private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+ private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
+ private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
+ private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
+ private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
+ private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testOutputTypeIsBigint() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
+ }
+
+ @Test public void testAccepts2Timestamps() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertTrue(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAccept3Timestamps() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE_MINUS_1_YEAR),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAccept1Timestamp() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY_MINUTE,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testEvaluateDiffSeconds() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_SECOND,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_2_SEC));
+
+ long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffMinutes() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_MINUTE,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_3_MIN));
+
+ long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffHours() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_HOUR,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_4_HOURS));
+
+ long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffDays() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_7_DAYS));
+
+ long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffMonths() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_MONTH,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_2_MONTHS));
+
+ long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffYears() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_YEAR,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_1_YEAR));
+
+ long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateNegativeDiffSeconds() {
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_SECOND,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+
+ thrown.expect(IllegalArgumentException.class);
+
+ BeamSqlDatetimeMinusExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY_MINUTE,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ eval(minusExpression);
+ }
+
+ private static BeamSqlDatetimeMinusExpression minusExpression(
+ SqlTypeName intervalsToCount, BeamSqlExpression ... operands) {
+ return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), intervalsToCount);
+ }
+
+ private BeamSqlExpression timestamp(Date date) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
+ }
+
+ private long eval(BeamSqlDatetimeMinusExpression minusExpression) {
+ return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
+ }
+
+ private long applyMultiplier(long value, TimeUnit timeUnit) {
+ return value * timeUnit.multiplier.longValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
new file mode 100644
index 0000000..894d094
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/DatetimeReinterpretConversionsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DatetimeReinterpretConversions}.
+ */
+public class DatetimeReinterpretConversionsTest {
+ private static final long DATE_LONG = 1000L;
+ private static final Date DATE = new Date(DATE_LONG);
+ private static final GregorianCalendar CALENDAR = new GregorianCalendar(2017, 8, 9);
+
+ private static final BeamSqlPrimitive DATE_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.DATE, DATE);
+
+ private static final BeamSqlPrimitive TIME_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.TIME, CALENDAR);
+
+ private static final BeamSqlPrimitive TIMESTAMP_PRIMITIVE = BeamSqlPrimitive.of(
+ SqlTypeName.TIMESTAMP, DATE);
+
+ @Test public void testTimeToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ DatetimeReinterpretConversions.TIME_TO_BIGINT
+ .convert(TIME_PRIMITIVE);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(CALENDAR.getTimeInMillis(), conversionResultPrimitive.getLong());
+ }
+
+ @Test public void testDateToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT
+ .convert(DATE_PRIMITIVE);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(DATE_LONG, conversionResultPrimitive.getLong());
+ }
+
+ @Test public void testTimestampToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ DatetimeReinterpretConversions.DATE_TYPES_TO_BIGINT
+ .convert(TIMESTAMP_PRIMITIVE);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(DATE_LONG, conversionResultPrimitive.getLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
new file mode 100644
index 0000000..2bf14f6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/IntegerReinterpretConversionsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+
+/**
+ * Unit tests for {@link IntegerReinterpretConversions}.
+ */
+
+public class IntegerReinterpretConversionsTest {
+
+ private static final BeamSqlPrimitive TINYINT_PRIMITIVE_5 = BeamSqlPrimitive.of(
+ SqlTypeName.TINYINT, (byte) 5);
+
+ private static final BeamSqlPrimitive SMALLINT_PRIMITIVE_6 = BeamSqlPrimitive.of(
+ SqlTypeName.SMALLINT, (short) 6);
+
+ private static final BeamSqlPrimitive INTEGER_PRIMITIVE_8 = BeamSqlPrimitive.of(
+ SqlTypeName.INTEGER, 8);
+
+ private static final BeamSqlPrimitive BIGINT_PRIMITIVE_15 = BeamSqlPrimitive.of(
+ SqlTypeName.BIGINT, 15L);
+
+ @Test public void testTinyIntToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+ .convert(TINYINT_PRIMITIVE_5);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(5L, conversionResultPrimitive.getLong());
+ }
+
+ @Test public void testSmallIntToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+ .convert(SMALLINT_PRIMITIVE_6);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(6L, conversionResultPrimitive.getLong());
+ }
+
+ @Test public void testIntegerToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+ .convert(INTEGER_PRIMITIVE_8);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(8L, conversionResultPrimitive.getLong());
+ }
+
+ @Test public void testBigintToBigint() {
+ BeamSqlPrimitive conversionResultPrimitive =
+ IntegerReinterpretConversions.INTEGER_TYPES_TO_BIGINT
+ .convert(BIGINT_PRIMITIVE_15);
+
+ assertEquals(SqlTypeName.BIGINT, conversionResultPrimitive.getOutputType());
+ assertEquals(15L, conversionResultPrimitive.getLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
new file mode 100644
index 0000000..31cdab8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpretConversionTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+/**
+ * Unit test for {@link ReinterpretConversion}.
+ */
+public class ReinterpretConversionTest {
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testNewInstanceProperties() {
+ Set<SqlTypeName> from = ImmutableSet.of(SqlTypeName.FLOAT, SqlTypeName.TIME);
+ SqlTypeName to = SqlTypeName.BOOLEAN;
+ Function<BeamSqlPrimitive, BeamSqlPrimitive> mockConversionFunction = mock(Function.class);
+
+ ReinterpretConversion conversion = ReinterpretConversion.builder()
+ .from(from)
+ .to(to)
+ .convert(mockConversionFunction)
+ .build();
+
+ assertEquals(from, conversion.from());
+ assertEquals(to, conversion.to());
+ }
+
+ @Test public void testConvert() {
+ BeamSqlPrimitive integerPrimitive = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3);
+ BeamSqlPrimitive booleanPrimitive = BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true);
+
+ Function<BeamSqlPrimitive, BeamSqlPrimitive> mockConversionFunction = mock(Function.class);
+ doReturn(booleanPrimitive).when(mockConversionFunction).apply(same(integerPrimitive));
+
+ ReinterpretConversion conversion = ReinterpretConversion.builder()
+ .from(SqlTypeName.INTEGER)
+ .to(SqlTypeName.BOOLEAN)
+ .convert(mockConversionFunction)
+ .build();
+
+ BeamSqlPrimitive conversionResult = conversion.convert(integerPrimitive);
+
+ assertSame(booleanPrimitive, conversionResult);
+ verify(mockConversionFunction).apply(same(integerPrimitive));
+ }
+
+ @Test public void testBuilderThrowsWithoutFrom() {
+ thrown.expect(IllegalArgumentException.class);
+ ReinterpretConversion.builder()
+ .to(SqlTypeName.BOOLEAN)
+ .convert(mock(Function.class))
+ .build();
+ }
+
+ @Test public void testBuilderThrowsWihtoutTo() {
+ thrown.expect(IllegalArgumentException.class);
+ ReinterpretConversion.builder()
+ .from(SqlTypeName.BOOLEAN)
+ .convert(mock(Function.class))
+ .build();
+ }
+
+ @Test public void testBuilderThrowsWihtoutConversionFunction() {
+ thrown.expect(IllegalArgumentException.class);
+ ReinterpretConversion.builder()
+ .from(SqlTypeName.BOOLEAN)
+ .to(SqlTypeName.SMALLINT)
+ .build();
+ }
+
+ @Test public void testConvertThrowsForUnsupportedInput() {
+ thrown.expect(IllegalArgumentException.class);
+
+ ReinterpretConversion conversion = ReinterpretConversion.builder()
+ .from(SqlTypeName.DATE)
+ .to(SqlTypeName.BOOLEAN)
+ .convert(mock(Function.class))
+ .build();
+
+ conversion.convert(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
new file mode 100644
index 0000000..6406831
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/ReinterpreterTest.java
@@ -0,0 +1,155 @@
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.internal.util.collections.Sets;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit tests for {@link Reinterpreter}.
+ */
+public class ReinterpreterTest {
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testBuilderCreatesInstance() {
+ Reinterpreter reinterpreter = newReinterpreter();
+ assertNotNull(reinterpreter);
+ }
+
+ @Test public void testBuilderThrowsWithoutConverters() {
+ thrown.expect(IllegalArgumentException.class);
+ Reinterpreter.builder().build();
+ }
+
+ @Test public void testCanConvertBetweenSupportedTypes() {
+ Reinterpreter reinterpreter = Reinterpreter.builder()
+ .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+ .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+ .build();
+
+ assertTrue(reinterpreter.canConvert(SqlTypeName.SMALLINT, SqlTypeName.SYMBOL));
+ assertTrue(reinterpreter.canConvert(SqlTypeName.DATE, SqlTypeName.SYMBOL));
+ assertTrue(reinterpreter.canConvert(SqlTypeName.FLOAT, SqlTypeName.INTEGER));
+ }
+
+ @Test public void testCannotConvertFromUnsupportedTypes() {
+ Reinterpreter reinterpreter = Reinterpreter.builder()
+ .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+ .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+ .build();
+
+ Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+ unsupportedTypes.removeAll(
+ Sets.newSet(SqlTypeName.DATE, SqlTypeName.SMALLINT, SqlTypeName.FLOAT));
+
+ for (SqlTypeName unsupportedType : unsupportedTypes) {
+ assertFalse(reinterpreter.canConvert(unsupportedType, SqlTypeName.DATE));
+ assertFalse(reinterpreter.canConvert(unsupportedType, SqlTypeName.INTEGER));
+ }
+ }
+
+ @Test public void testCannotConvertToUnsupportedTypes() {
+ Reinterpreter reinterpreter = Reinterpreter.builder()
+ .withConversion(mockConversion(SqlTypeName.SYMBOL, SqlTypeName.SMALLINT, SqlTypeName.DATE))
+ .withConversion(mockConversion(SqlTypeName.INTEGER, SqlTypeName.FLOAT))
+ .build();
+
+ Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+ unsupportedTypes.removeAll(Sets.newSet(SqlTypeName.SYMBOL, SqlTypeName.INTEGER));
+
+ for (SqlTypeName unsupportedType : unsupportedTypes) {
+ assertFalse(reinterpreter.canConvert(SqlTypeName.SMALLINT, unsupportedType));
+ assertFalse(reinterpreter.canConvert(SqlTypeName.DATE, unsupportedType));
+ assertFalse(reinterpreter.canConvert(SqlTypeName.FLOAT, unsupportedType));
+ }
+ }
+
+ @Test public void testConvert() {
+ Date date = new Date(12345L);
+ BeamSqlPrimitive stringPrimitive = BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello");
+ BeamSqlPrimitive datePrimitive = BeamSqlPrimitive.of(SqlTypeName.DATE, date);
+
+ ReinterpretConversion mockConversion = mock(ReinterpretConversion.class);
+ doReturn(Sets.newSet(SqlTypeName.VARCHAR)).when(mockConversion).from();
+ doReturn(SqlTypeName.DATE).when(mockConversion).to();
+ doReturn(datePrimitive).when(mockConversion).convert(same(stringPrimitive));
+
+ Reinterpreter reinterpreter = Reinterpreter.builder().withConversion(mockConversion).build();
+ BeamSqlPrimitive converted = reinterpreter.convert(SqlTypeName.DATE, stringPrimitive);
+
+ assertSame(datePrimitive, converted);
+ verify(mockConversion).convert(same(stringPrimitive));
+ }
+
+ @Test public void testConvertThrowsForUnsupportedFromType() {
+ thrown.expect(UnsupportedOperationException.class);
+
+ BeamSqlPrimitive intervalPrimitive = BeamSqlPrimitive
+ .of(SqlTypeName.INTERVAL_DAY, new BigDecimal(2));
+
+ Reinterpreter reinterpreter = newReinterpreter();
+ reinterpreter.convert(SqlTypeName.DATE, intervalPrimitive);
+ }
+
+ @Test public void testConvertThrowsForUnsupportedToType() {
+ thrown.expect(UnsupportedOperationException.class);
+
+ BeamSqlPrimitive stringPrimitive = BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello");
+
+ Reinterpreter reinterpreter = newReinterpreter();
+ reinterpreter.convert(SqlTypeName.INTERVAL_DAY, stringPrimitive);
+ }
+
+ private Reinterpreter newReinterpreter() {
+ return Reinterpreter.builder()
+ .withConversion(
+ mockConversion(
+ SqlTypeName.DATE,
+ SqlTypeName.SMALLINT, SqlTypeName.VARCHAR))
+ .build();
+ }
+
+ private ReinterpretConversion mockConversion(SqlTypeName convertTo, SqlTypeName ... convertFrom) {
+ ReinterpretConversion conversion = mock(ReinterpretConversion.class);
+
+ doReturn(Sets.newSet(convertFrom)).when(conversion).from();
+ doReturn(convertTo).when(conversion).to();
+
+ return conversion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fe7480a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 6937a18..ba901d3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -91,6 +91,58 @@ public class BeamSqlDateFunctionsIntegrationTest
checker.buildRunAndCheck();
}
+ @Test public void testTimestampDiff() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:01:58')", 0)
+ .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:01:59')", 1)
+ .addExpr("TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:02:00')", 2)
+
+ .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:02:57')", 0)
+ .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:02:58')", 1)
+ .addExpr("TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 01:03:58')", 2)
+
+ .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 02:01:57')", 0)
+ .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 02:01:58')", 1)
+ .addExpr("TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-19 03:01:58')", 2)
+
+ .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-20 01:01:57')", 0)
+ .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-20 01:01:58')", 1)
+ .addExpr("TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+ + "TIMESTAMP '1984-04-21 01:01:58')", 2)
+
+ .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+ + "TIMESTAMP '1984-02-19 01:01:57')", 0)
+ .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+ + "TIMESTAMP '1984-02-19 01:01:58')", 1)
+ .addExpr("TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+ + "TIMESTAMP '1984-03-19 01:01:58')", 2)
+
+ .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+ + "TIMESTAMP '1982-01-19 01:01:57')", 0)
+ .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+ + "TIMESTAMP '1982-01-19 01:01:58')", 1)
+ .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+ + "TIMESTAMP '1983-01-19 01:01:58')", 2)
+
+ .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+ + "TIMESTAMP '1980-01-19 01:01:58')", -1)
+ .addExpr("TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+ + "TIMESTAMP '1979-01-19 01:01:58')", -2)
+ ;
+ checker.buildRunAndCheck();
+ }
+
@Test public void testDateTimeFunctions_currentTime() throws Exception {
String sql = "SELECT "
+ "LOCALTIME as l,"
[2/2] beam git commit: This closes #4065
Posted by xu...@apache.org.
This closes #4065
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f80bf2fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f80bf2fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f80bf2fd
Branch: refs/heads/master
Commit: f80bf2fdff655b62deb458b1e57d867bfbf03f41
Parents: 269bf89 4fe7480
Author: James Xu <xu...@gmail.com>
Authored: Tue Nov 7 23:11:03 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Tue Nov 7 23:11:03 2017 +0800
----------------------------------------------------------------------
sdks/java/extensions/sql/pom.xml | 8 +
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 9 +-
.../operator/BeamSqlReinterpretExpression.java | 55 -----
.../date/BeamSqlDatetimeMinusExpression.java | 107 +++++++++
.../BeamSqlReinterpretExpression.java | 58 +++++
.../DatetimeReinterpretConversions.java | 59 +++++
.../IntegerReinterpretConversions.java | 44 ++++
.../reinterpret/ReinterpretConversion.java | 114 +++++++++
.../operator/reinterpret/Reinterpreter.java | 101 ++++++++
.../operator/reinterpret/package-info.java | 22 ++
.../impl/interpreter/BeamSqlFnExecutorTest.java | 14 ++
.../BeamSqlReinterpretExpressionTest.java | 116 +++++++---
.../BeamSqlDatetimeMinusExpressionTest.java | 232 +++++++++++++++++++
.../DatetimeReinterpretConversionsTest.java | 73 ++++++
.../IntegerReinterpretConversionsTest.java | 81 +++++++
.../reinterpret/ReinterpretConversionTest.java | 116 ++++++++++
.../operator/reinterpret/ReinterpreterTest.java | 155 +++++++++++++
.../BeamSqlDateFunctionsIntegrationTest.java | 52 +++++
18 files changed, 1330 insertions(+), 86 deletions(-)
----------------------------------------------------------------------