You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/08 06:11:33 UTC
[1/2] beam git commit: [BEAM-2203] Implement 'timestamp - interval'
Repository: beam
Updated Branches:
refs/heads/master 299429394 -> e0166ceb6
[BEAM-2203] Implement 'timestamp - interval'
Move TIMESTAMPDIFF implementation into BeamSqlTimestampMinusTimestampExpression, add BeamSqlTimestampMinusIntervalExpression
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a63eb21
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a63eb21
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a63eb21
Branch: refs/heads/master
Commit: 0a63eb2195575f6cc5ee228d4edb1ed9571f9f5f
Parents: 29942939
Author: Anton Kedin <ke...@google.com>
Authored: Sun Nov 5 23:26:10 2017 -0800
Committer: James Xu <xu...@gmail.com>
Committed: Wed Nov 8 11:58:34 2017 +0800
----------------------------------------------------------------------
.../date/BeamSqlDatetimeMinusExpression.java | 104 ++++-----
...BeamSqlTimestampMinusIntervalExpression.java | 83 +++++++
...eamSqlTimestampMinusTimestampExpression.java | 97 ++++++++
.../BeamSqlDatetimeMinusExpressionTest.java | 189 ++++-----------
...SqlTimestampMinusIntervalExpressionTest.java | 163 +++++++++++++
...qlTimestampMinusTimestampExpressionTest.java | 233 +++++++++++++++++++
.../BeamSqlDateFunctionsIntegrationTest.java | 18 ++
7 files changed, 691 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
index 6a5cbb1..6948ba1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
@@ -28,80 +28,80 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimi
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
import org.joda.time.DurationFieldType;
-import org.joda.time.Period;
-import org.joda.time.PeriodType;
/**
* Infix '-' operation for timestamps.
*
- * <p>Currently this implementation is specific to how Calcite implements 'TIMESTAMPDIFF(..)'.
- * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
- * with corresponding TimeUnit's multiplier.
+ * <p>Implements 2 SQL subtraction operations at the moment:
+ * 'timestampdiff(timeunit, timestamp, timestamp)', and 'timestamp - interval'
*
- * <p>In addition to this TIMESTAMPDIFF(..) implementation, Calcite also supports infix
- * operations 'interval - interval' and 'timestamp - interval'.
- * These are not implemented yet.
+ * <p>Calcite converts both of the above into infix '-' expression, with different operands and
+ * return types.
+ *
+ * <p>This class delegates evaluation to specific implementation of one of the above operations,
+ * see {@link BeamSqlTimestampMinusTimestampExpression}
+ * and {@link BeamSqlTimestampMinusIntervalExpression}
+ *
+ * <p>Calcite supports one more subtraction kind: 'interval - interval',
+ * but it is not implemented yet.
*/
public class BeamSqlDatetimeMinusExpression extends BeamSqlExpression {
- private SqlTypeName intervalType;
- private static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
+ static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
ImmutableMap.<SqlTypeName, DurationFieldType>builder()
- .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
- .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
- .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
- .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
- .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
- .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
- .build();
-
- public BeamSqlDatetimeMinusExpression(
- List<BeamSqlExpression> operands, SqlTypeName intervalType) {
- super(operands, SqlTypeName.BIGINT);
- this.intervalType = intervalType;
+ .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
+ .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
+ .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
+ .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
+ .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
+ .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
+ .build();
+
+ private BeamSqlExpression delegateExpression;
+
+ public BeamSqlDatetimeMinusExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+
+ this.delegateExpression = createDelegateExpression(operands, outputType);
}
- /**
- * Requires exactly 2 operands. One should be a timestamp, another an interval
- */
- @Override
- public boolean accept() {
- return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
- && operands.size() == 2
- && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
- && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+ private BeamSqlExpression createDelegateExpression(
+ List<BeamSqlExpression> operands, SqlTypeName outputType) {
+
+ if (isTimestampMinusTimestamp(operands, outputType)) {
+ return new BeamSqlTimestampMinusTimestampExpression(operands, outputType);
+ } else if (isTimestampMinusInterval(operands, outputType)) {
+ return new BeamSqlTimestampMinusIntervalExpression(operands, outputType);
+ }
+
+ return null;
}
- /**
- * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
- * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
- * multiplied by the corresponding TimeUnit.multiplier
- */
- public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
- DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
- DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+ private boolean isTimestampMinusTimestamp(
+ List<BeamSqlExpression> operands, SqlTypeName outputType) {
+
+ return BeamSqlTimestampMinusTimestampExpression.accept(operands, outputType);
+ }
- long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
- long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+ private boolean isTimestampMinusInterval(
+ List<BeamSqlExpression> operands, SqlTypeName outputType) {
- return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+ return BeamSqlTimestampMinusIntervalExpression.accept(operands, outputType);
}
- private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
- Period period = new Period(timestampStart, timestampEnd,
- PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
- return period.get(durationFieldType(intervalType));
+ @Override
+ public boolean accept() {
+ return delegateExpression != null && delegateExpression.accept();
}
- private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
- if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
- throw new IllegalArgumentException("Counting "
- + intervalTypeToCount.getName() + "s between dates is not supported");
+ @Override
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ if (delegateExpression == null) {
+ throw new IllegalStateException("Unable to execute unsupported 'datetime minus' expression");
}
- return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+ return delegateExpression.evaluate(inputRow, window);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
new file mode 100644
index 0000000..236d148
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+
+/**
+ * '-' operator for 'timestamp - interval' expressions.
+ *
+ * <p>See {@link BeamSqlDatetimeMinusExpression} for other kinds of datetime types subtraction.
+ */
+public class BeamSqlTimestampMinusIntervalExpression extends BeamSqlExpression {
+
+ public BeamSqlTimestampMinusIntervalExpression(
+ List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override
+ public boolean accept() {
+ return accept(operands, outputType);
+ }
+
+ static boolean accept(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ return operands.size() == 2
+ && SqlTypeName.TIMESTAMP.equals(outputType)
+ && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+ && INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamRecord row, BoundedWindow window) {
+ DateTime date = new DateTime(opValueEvaluated(0, row, window));
+ Period period = intervalToPeriod(op(1).evaluate(row, window));
+
+ Date subtractionResult = date.minus(period).toDate();
+
+ return BeamSqlPrimitive.of(outputType, subtractionResult);
+ }
+
+ private Period intervalToPeriod(BeamSqlPrimitive operand) {
+ BigDecimal intervalValue = operand.getDecimal();
+ SqlTypeName intervalType = operand.getOutputType();
+
+ int numberOfIntervals = intervalValue
+ .divide(TimeUnitUtils.timeUnitInternalMultiplier(intervalType)).intValueExact();
+
+ return new Period().withField(durationFieldType(intervalType), numberOfIntervals);
+ }
+
+ private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+ return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
new file mode 100644
index 0000000..64ac9c8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+
+import java.util.List;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+
+/**
+ * Infix '-' operation for timestamps.
+ *
+ * <p>Currently this implementation is specific to how Calcite parses 'TIMESTAMPDIFF(..)'.
+ * It converts the TIMESTAMPDIFF() call into infix minus and normalizes it
+ * with corresponding TimeUnit's multiplier.
+ *
+ * <p>See {@link BeamSqlDatetimeMinusExpression} for other kinds of datetime types subtraction.
+ */
+public class BeamSqlTimestampMinusTimestampExpression extends BeamSqlExpression {
+ private SqlTypeName intervalType;
+
+ public BeamSqlTimestampMinusTimestampExpression(
+ List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+ super(operands, SqlTypeName.BIGINT);
+ this.intervalType = intervalType;
+ }
+
+ /**
+ * Requires exactly 2 operands. One should be a timestamp, another an interval
+ */
+ @Override
+ public boolean accept() {
+ return accept(operands, intervalType);
+ }
+
+ static boolean accept(List<BeamSqlExpression> operands, SqlTypeName intervalType) {
+ return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
+ && operands.size() == 2
+ && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
+ && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
+ }
+
+ /**
+ * Returns the count of intervals between dates, times TimeUnit.multiplier of the interval type.
+ * Calcite deals with all intervals this way. Whenever there is an interval, its value is always
+ * multiplied by the corresponding TimeUnit.multiplier
+ */
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ DateTime timestampStart = new DateTime(opValueEvaluated(1, inputRow, window));
+ DateTime timestampEnd = new DateTime(opValueEvaluated(0, inputRow, window));
+
+ long numberOfIntervals = numberOfIntervalsBetweenDates(timestampStart, timestampEnd);
+ long multiplier = TimeUnitUtils.timeUnitInternalMultiplier(intervalType).longValue();
+
+ return BeamSqlPrimitive.of(SqlTypeName.BIGINT, multiplier * numberOfIntervals);
+ }
+
+ private long numberOfIntervalsBetweenDates(DateTime timestampStart, DateTime timestampEnd) {
+ Period period = new Period(timestampStart, timestampEnd,
+ PeriodType.forFields(new DurationFieldType[] { durationFieldType(intervalType) }));
+ return period.get(durationFieldType(intervalType));
+ }
+
+ private static DurationFieldType durationFieldType(SqlTypeName intervalTypeToCount) {
+ if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
+ throw new IllegalArgumentException("Counting "
+ + intervalTypeToCount.getName() + "s between dates is not supported");
+ }
+
+ return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
index e0fe166..ef837ca 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Date;
@@ -32,201 +33,101 @@ import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.DateTime;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
/**
* Unit tests for {@link BeamSqlDatetimeMinusExpression}.
*/
public class BeamSqlDatetimeMinusExpressionTest {
+
private static final BeamRecord NULL_ROW = null;
private static final BoundedWindow NULL_WINDOW = null;
- private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+ private static final Date DATE = new Date(329281L);
private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
- private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
- private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
- private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
- private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
- private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
- @Rule public ExpectedException thrown = ExpectedException.none();
+ private static final BeamSqlPrimitive TIMESTAMP = BeamSqlPrimitive.of(
+ SqlTypeName.TIMESTAMP, DATE);
- @Test public void testOutputTypeIsBigint() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE));
+ private static final BeamSqlPrimitive TIMESTAMP_MINUS_2_SEC = BeamSqlPrimitive.of(
+ SqlTypeName.TIMESTAMP, DATE_MINUS_2_SEC);
- assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
- }
+ private static final BeamSqlPrimitive INTERVAL_2_SEC = BeamSqlPrimitive.of(
+ SqlTypeName.INTERVAL_SECOND, TimeUnit.SECOND.multiplier.multiply(new BigDecimal(2)));
- @Test public void testAccepts2Timestamps() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE));
+ private static final BeamSqlPrimitive STRING = BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR, "hello");
- assertTrue(minusExpression.accept());
- }
+ private static final BeamSqlPrimitive INTERVAL_3_MONTHS = BeamSqlPrimitive.of(
+ SqlTypeName.INTERVAL_MONTH, TimeUnit.MONTH.multiplier.multiply(new BigDecimal(3)));
- @Test public void testDoesNotAccept3Timestamps() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE_MINUS_1_YEAR),
- timestamp(DATE));
+ @Test public void testOutputType() {
+ BeamSqlDatetimeMinusExpression minusExpression1 =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+ BeamSqlDatetimeMinusExpression minusExpression2 =
+ minusExpression(SqlTypeName.BIGINT, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
- assertFalse(minusExpression.accept());
+ assertEquals(SqlTypeName.TIMESTAMP, minusExpression1.getOutputType());
+ assertEquals(SqlTypeName.BIGINT, minusExpression2.getOutputType());
}
- @Test public void testDoesNotAccept1Timestamp() {
+ @Test public void testAcceptsTimestampMinusTimestamp() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE));
+ minusExpression(SqlTypeName.INTERVAL_SECOND, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
- assertFalse(minusExpression.accept());
+ assertTrue(minusExpression.accept());
}
- @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+ @Test public void testAcceptsTimestampMinusInteval() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY_MINUTE,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE));
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
- assertFalse(minusExpression.accept());
+ assertTrue(minusExpression.accept());
}
- @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+ @Test public void testDoesNotAcceptUnsupportedReturnType() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
- timestamp(DATE));
+ minusExpression(SqlTypeName.BIGINT, TIMESTAMP, INTERVAL_2_SEC);
assertFalse(minusExpression.accept());
}
- @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+ @Test public void testDoesNotAcceptUnsupportedFirstOperand() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE),
- BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+ minusExpression(SqlTypeName.TIMESTAMP, STRING, INTERVAL_2_SEC);
assertFalse(minusExpression.accept());
}
- @Test public void testEvaluateDiffSeconds() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_SECOND,
- timestamp(DATE),
- timestamp(DATE_MINUS_2_SEC));
-
- long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
- assertEquals(expectedResult, eval(minusExpression));
- }
-
- @Test public void testEvaluateDiffMinutes() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_MINUTE,
- timestamp(DATE),
- timestamp(DATE_MINUS_3_MIN));
-
- long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
- assertEquals(expectedResult, eval(minusExpression));
- }
-
- @Test public void testEvaluateDiffHours() {
+ @Test public void testDoesNotAcceptUnsupportedSecondOperand() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_HOUR,
- timestamp(DATE),
- timestamp(DATE_MINUS_4_HOURS));
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, STRING);
- long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
- assertEquals(expectedResult, eval(minusExpression));
+ assertFalse(minusExpression.accept());
}
- @Test public void testEvaluateDiffDays() {
+ @Test public void testEvaluateTimestampMinusTimestamp() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY,
- timestamp(DATE),
- timestamp(DATE_MINUS_7_DAYS));
+ minusExpression(SqlTypeName.INTERVAL_SECOND, TIMESTAMP, TIMESTAMP_MINUS_2_SEC);
- long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
- assertEquals(expectedResult, eval(minusExpression));
- }
+ BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
- @Test public void testEvaluateDiffMonths() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_MONTH,
- timestamp(DATE),
- timestamp(DATE_MINUS_2_MONTHS));
-
- long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
- assertEquals(expectedResult, eval(minusExpression));
+ assertEquals(SqlTypeName.BIGINT, subtractionResult.getOutputType());
+ assertEquals(2L * TimeUnit.SECOND.multiplier.longValue(), subtractionResult.getLong());
}
- @Test public void testEvaluateDiffYears() {
+ @Test public void testEvaluateTimestampMinusInteval() {
BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_YEAR,
- timestamp(DATE),
- timestamp(DATE_MINUS_1_YEAR));
-
- long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
- assertEquals(expectedResult, eval(minusExpression));
- }
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
- @Test public void testEvaluateNegativeDiffSeconds() {
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_SECOND,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE));
+ BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
- long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
- assertEquals(expectedResult, eval(minusExpression));
- }
-
- @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
-
- thrown.expect(IllegalArgumentException.class);
-
- BeamSqlDatetimeMinusExpression minusExpression =
- minusExpression(
- SqlTypeName.INTERVAL_DAY_MINUTE,
- timestamp(DATE_MINUS_2_SEC),
- timestamp(DATE));
-
- eval(minusExpression);
+ assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+ assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
}
private static BeamSqlDatetimeMinusExpression minusExpression(
- SqlTypeName intervalsToCount, BeamSqlExpression ... operands) {
- return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), intervalsToCount);
- }
-
- private BeamSqlExpression timestamp(Date date) {
- return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
- }
-
- private long eval(BeamSqlDatetimeMinusExpression minusExpression) {
- return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
- }
-
- private long applyMultiplier(long value, TimeUnit timeUnit) {
- return value * timeUnit.multiplier.longValue();
+ SqlTypeName outputType, BeamSqlExpression ... operands) {
+ return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), outputType);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
new file mode 100644
index 0000000..5232487
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlTimestampMinusIntervalExpression}.
+ */
+public class BeamSqlTimestampMinusIntervalExpressionTest {
+ private static final BeamRecord NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ private static final Date DATE = new Date(329281L);
+ private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+
+ private static final BeamSqlPrimitive TIMESTAMP = BeamSqlPrimitive.of(
+ SqlTypeName.TIMESTAMP, DATE);
+
+ private static final BeamSqlPrimitive INTERVAL_2_SEC = BeamSqlPrimitive.of(
+ SqlTypeName.INTERVAL_SECOND, TimeUnit.SECOND.multiplier.multiply(new BigDecimal(2)));
+
+ private static final BeamSqlPrimitive INTERVAL_3_MONTHS = BeamSqlPrimitive.of(
+ SqlTypeName.INTERVAL_MONTH, TimeUnit.MONTH.multiplier.multiply(new BigDecimal(3)));
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testBasicProperties() {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.INTERVAL_DAY_MINUTE, TIMESTAMP, INTERVAL_3_MONTHS);
+
+ assertEquals(SqlTypeName.INTERVAL_DAY_MINUTE, minusExpression.getOutputType());
+ assertEquals(Arrays.asList(TIMESTAMP, INTERVAL_3_MONTHS), minusExpression.getOperands());
+ }
+
+ @Test public void testAcceptsHappyPath() {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+
+ assertTrue(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptOneOperand() {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP);
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptThreeOperands() {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC, INTERVAL_3_MONTHS);
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptWrongOutputType() {
+ Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+ unsupportedTypes.remove(SqlTypeName.TIMESTAMP);
+
+ for (SqlTypeName unsupportedType : unsupportedTypes) {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(unsupportedType, TIMESTAMP, INTERVAL_2_SEC);
+
+ assertFalse(minusExpression.accept());
+ }
+ }
+
+ @Test public void testDoesNotAcceptWrongFirstOperand() {
+ Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+ unsupportedTypes.remove(SqlTypeName.TIMESTAMP);
+
+ for (SqlTypeName unsupportedType : unsupportedTypes) {
+ BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+ doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, unsupportedOperand, INTERVAL_2_SEC);
+
+ assertFalse(minusExpression.accept());
+ }
+ }
+
+ @Test public void testDoesNotAcceptWrongSecondOperand() {
+ Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+ unsupportedTypes.removeAll(INTERVALS_DURATIONS_TYPES.keySet());
+
+ for (SqlTypeName unsupportedType : unsupportedTypes) {
+ BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+ doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, unsupportedOperand);
+
+ assertFalse(minusExpression.accept());
+ }
+ }
+
+ @Test public void testAcceptsAllSupportedIntervalTypes() {
+ for (SqlTypeName unsupportedType : INTERVALS_DURATIONS_TYPES.keySet()) {
+ BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+ doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, unsupportedOperand);
+
+ assertTrue(minusExpression.accept());
+ }
+ }
+
+ @Test public void testEvaluateHappyPath() {
+ BeamSqlTimestampMinusIntervalExpression minusExpression =
+ minusExpression(SqlTypeName.TIMESTAMP, TIMESTAMP, INTERVAL_2_SEC);
+
+ BeamSqlPrimitive subtractionResult = minusExpression.evaluate(NULL_ROW, NULL_WINDOW);
+
+ assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+ assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
+ }
+
+ private static BeamSqlTimestampMinusIntervalExpression minusExpression(
+ SqlTypeName intervalsToCount, BeamSqlExpression... operands) {
+ return new BeamSqlTimestampMinusIntervalExpression(Arrays.asList(operands), intervalsToCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
new file mode 100644
index 0000000..54bf52d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlTimestampMinusTimestampExpression}.
+ */
+public class BeamSqlTimestampMinusTimestampExpressionTest {
+
+ private static final BeamRecord NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
+ private static final Date DATE_MINUS_2_SEC = new DateTime(DATE).minusSeconds(2).toDate();
+ private static final Date DATE_MINUS_3_MIN = new DateTime(DATE).minusMinutes(3).toDate();
+ private static final Date DATE_MINUS_4_HOURS = new DateTime(DATE).minusHours(4).toDate();
+ private static final Date DATE_MINUS_7_DAYS = new DateTime(DATE).minusDays(7).toDate();
+ private static final Date DATE_MINUS_2_MONTHS = new DateTime(DATE).minusMonths(2).toDate();
+ private static final Date DATE_MINUS_1_YEAR = new DateTime(DATE).minusYears(1).toDate();
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test public void testOutputTypeIsBigint() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertEquals(SqlTypeName.BIGINT, minusExpression.getOutputType());
+ }
+
+ @Test public void testAccepts2Timestamps() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertTrue(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAccept3Timestamps() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE_MINUS_1_YEAR),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAccept1Timestamp() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptUnsupportedIntervalToCount() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY_MINUTE,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptNotTimestampAsOperandOne() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+ timestamp(DATE));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testDoesNotAcceptNotTimestampAsOperandTwo() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+
+ assertFalse(minusExpression.accept());
+ }
+
+ @Test public void testEvaluateDiffSeconds() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_SECOND,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_2_SEC));
+
+ long expectedResult = applyMultiplier(2L, TimeUnit.SECOND);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffMinutes() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_MINUTE,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_3_MIN));
+
+ long expectedResult = applyMultiplier(3L, TimeUnit.MINUTE);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffHours() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_HOUR,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_4_HOURS));
+
+ long expectedResult = applyMultiplier(4L, TimeUnit.HOUR);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffDays() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_7_DAYS));
+
+ long expectedResult = applyMultiplier(7L, TimeUnit.DAY);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffMonths() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_MONTH,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_2_MONTHS));
+
+ long expectedResult = applyMultiplier(2L, TimeUnit.MONTH);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateDiffYears() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_YEAR,
+ timestamp(DATE),
+ timestamp(DATE_MINUS_1_YEAR));
+
+ long expectedResult = applyMultiplier(1L, TimeUnit.YEAR);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateNegativeDiffSeconds() {
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_SECOND,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ long expectedResult = applyMultiplier(-2L, TimeUnit.SECOND);
+ assertEquals(expectedResult, eval(minusExpression));
+ }
+
+ @Test public void testEvaluateThrowsForUnsupportedIntervalType() {
+
+ thrown.expect(IllegalArgumentException.class);
+
+ BeamSqlTimestampMinusTimestampExpression minusExpression =
+ minusExpression(
+ SqlTypeName.INTERVAL_DAY_MINUTE,
+ timestamp(DATE_MINUS_2_SEC),
+ timestamp(DATE));
+
+ eval(minusExpression);
+ }
+
+ private static BeamSqlTimestampMinusTimestampExpression minusExpression(
+ SqlTypeName intervalsToCount, BeamSqlExpression... operands) {
+ return new BeamSqlTimestampMinusTimestampExpression(Arrays.asList(operands), intervalsToCount);
+ }
+
+ private BeamSqlExpression timestamp(Date date) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, date);
+ }
+
+ private long eval(BeamSqlTimestampMinusTimestampExpression minusExpression) {
+ return minusExpression.evaluate(NULL_ROW, NULL_WINDOW).getLong();
+ }
+
+ private long applyMultiplier(long value, TimeUnit timeUnit) {
+ return value * timeUnit.multiplier.longValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a63eb21/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index ba901d3..ec5b295 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -143,6 +143,24 @@ public class BeamSqlDateFunctionsIntegrationTest
checker.buildRunAndCheck();
}
+ @Test public void testTimestampMinusInterval() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND",
+ parseDate("1984-04-19 01:01:56"))
+ .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE",
+ parseDate("1984-04-19 01:00:58"))
+ .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR",
+ parseDate("1984-04-18 21:01:58"))
+ .addExpr("TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY",
+ parseDate("1984-04-14 01:01:58"))
+ .addExpr("TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH",
+ parseDate("1983-11-19 01:01:58"))
+ .addExpr("TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
+ parseDate("1983-01-19 01:01:58"))
+ ;
+ checker.buildRunAndCheck();
+ }
+
@Test public void testDateTimeFunctions_currentTime() throws Exception {
String sql = "SELECT "
+ "LOCALTIME as l,"
[2/2] beam git commit: This closes #4082
Posted by xu...@apache.org.
This closes #4082
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0166ceb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0166ceb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0166ceb
Branch: refs/heads/master
Commit: e0166ceb657b393e4037ac7c178797a864379745
Parents: 29942939 0a63eb2
Author: James Xu <xu...@gmail.com>
Authored: Wed Nov 8 11:59:07 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Wed Nov 8 11:59:07 2017 +0800
----------------------------------------------------------------------
.../date/BeamSqlDatetimeMinusExpression.java | 104 ++++-----
...BeamSqlTimestampMinusIntervalExpression.java | 83 +++++++
...eamSqlTimestampMinusTimestampExpression.java | 97 ++++++++
.../BeamSqlDatetimeMinusExpressionTest.java | 189 ++++-----------
...SqlTimestampMinusIntervalExpressionTest.java | 163 +++++++++++++
...qlTimestampMinusTimestampExpressionTest.java | 233 +++++++++++++++++++
.../BeamSqlDateFunctionsIntegrationTest.java | 18 ++
7 files changed, 691 insertions(+), 196 deletions(-)
----------------------------------------------------------------------