You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/08 14:36:27 UTC
[flink] 02/02: [FLINK-12844][table-planner-blink] Use default
conversion class LocalDate/LocalTime/LocalDateTime for
DateType/TimeType/TimestampType
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e09d9479beb9b38e1de1f0662000e8c01336480
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Jun 17 14:00:07 2019 +0800
[FLINK-12844][table-planner-blink] Use default conversion class LocalDate/LocalTime/LocalDateTime for DateType/TimeType/TimestampType
This closes #8762
---
.../java/org/apache/flink/table/api/Types.java | 21 +
.../utils/LegacyTypeInfoDataTypeConverter.java | 5 +
.../flink/table/codegen/CodeGeneratorContext.scala | 64 ++-
.../apache/flink/table/codegen/GenerateUtils.scala | 8 +-
.../flink/table/codegen/calls/BuiltInMethods.scala | 106 ++++-
.../codegen/calls/CurrentTimePointCallGen.scala | 3 +-
.../flink/table/codegen/calls/ExtractCallGen.scala | 115 ++++++
.../table/codegen/calls/FunctionGenerator.scala | 28 +-
.../table/codegen/calls/ScalarOperatorGens.scala | 29 +-
.../apache/flink/table/expressions/literals.scala | 6 +-
.../stream/StreamExecGroupWindowAggregate.scala | 9 +-
.../flink/table/plan/util/RexNodeExtractor.scala | 18 +-
.../table/sources/tsextractors/ExistingField.scala | 12 +-
.../flink/table/plan/batch/sql/TableSourceTest.xml | 2 +-
.../table/plan/stream/sql/TableSourceTest.xml | 2 +-
.../table/expressions/TemporalTypesTest.scala | 43 +-
.../expressions/utils/ArrayTypeTestBase.scala | 4 +-
.../table/expressions/utils/RowTypeTestBase.scala | 6 +-
.../utils/ScalarOperatorsTestBase.scala | 4 +-
.../expressions/utils/ScalarTypesTestBase.scala | 26 +-
.../table/plan/batch/sql/TableSourceTest.scala | 15 +-
.../plan/batch/sql/agg/AggregateTestBase.scala | 2 +-
.../common/AggregateReduceGroupingTestBase.scala | 2 +-
.../table/plan/stream/sql/TableSourceTest.scala | 31 +-
.../table/plan/stream/sql/agg/AggregateTest.scala | 2 +-
.../plan/stream/sql/join/LookupJoinTest.scala | 19 +-
.../table/plan/util/RexNodeExtractorTest.scala | 12 +-
.../flink/table/runtime/batch/sql/CalcITCase.scala | 145 ++++---
.../table/runtime/batch/sql/CorrelateITCase.scala | 7 +-
.../table/runtime/batch/sql/OverWindowITCase.scala | 98 ++---
.../table/runtime/batch/sql/TableScanITCase.scala | 26 +-
.../table/runtime/batch/sql/UnnestITCase.scala | 6 +-
.../sql/agg/AggregateReduceGroupingITCase.scala | 43 +-
.../runtime/batch/sql/agg/GroupingSetsITCase.scala | 42 +-
.../batch/sql/agg/WindowAggregateITCase.scala | 458 +++++++++++----------
.../table/runtime/batch/table/CalcITCase.scala | 25 +-
.../runtime/batch/table/CorrelateITCase.scala | 7 +-
.../runtime/batch/table/GroupWindowITCase.scala | 102 ++---
.../runtime/batch/table/OverWindowITCase.scala | 30 +-
.../table/runtime/stream/sql/AggregateITCase.scala | 97 ++---
.../runtime/stream/sql/MatchRecognizeITCase.scala | 6 +-
.../table/runtime/stream/sql/TableScanITCase.scala | 4 +-
.../runtime/stream/sql/TableSourceITCase.scala | 28 +-
.../runtime/stream/sql/WindowAggregateITCase.scala | 44 +-
.../runtime/stream/sql/WindowJoinITCase.scala | 16 +-
.../flink/table/runtime/utils/BatchTestBase.scala | 2 -
.../flink/table/runtime/utils/StreamTestSink.scala | 2 -
.../flink/table/runtime/utils/TestData.scala | 125 +++---
.../utils/UserDefinedFunctionTestUtils.scala | 39 +-
.../apache/flink/table/util/DateTimeTestUtil.scala | 17 +-
.../api/validation/TableSourceValidationTest.scala | 36 +-
.../table/dataformat/DataFormatConverters.java | 89 +++-
.../table/runtime/functions/SqlDateTimeUtils.java | 259 +++++++++---
.../runtime/window/WindowOperatorBuilder.java | 10 +-
.../table/types/ClassLogicalTypeConverter.java | 95 +----
.../table/types/LogicalTypeDataTypeConverter.java | 33 +-
.../flink/table/typeutils/BaseRowTypeInfo.java | 8 +-
.../table/dataformat/DataFormatConvertersTest.java | 16 +-
.../table/runtime/window/WindowOperatorTest.java | 18 +-
.../window/grouping/HeapWindowsGroupingTest.java | 4 +-
60 files changed, 1471 insertions(+), 1060 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java
index f318217..a4284af 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java
@@ -130,6 +130,27 @@ public final class Types {
}
/**
+ * Returns type information for a Table API LocalDate type.
+ */
+ public static TypeInformation<java.time.LocalDate> LOCAL_DATE() {
+ return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE;
+ }
+
+ /**
+ * Returns type information for a Table API LocalTime type.
+ */
+ public static TypeInformation<java.time.LocalTime> LOCAL_TIME() {
+ return org.apache.flink.api.common.typeinfo.Types.LOCAL_TIME;
+ }
+
+ /**
+ * Returns type information for a Table API LocalDateTime type.
+ */
+ public static TypeInformation<java.time.LocalDateTime> LOCAL_DATE_TIME() {
+ return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
+ }
+
+ /**
* Returns type information for a Table API interval of months.
*/
public static TypeInformation<Integer> INTERVAL_MONTHS() {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
index 8279860..e9a56e6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -48,7 +48,9 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
@@ -99,6 +101,9 @@ public final class LegacyTypeInfoDataTypeConverter {
addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
+ addMapping(Types.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class));
+ addMapping(Types.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class));
+ addMapping(Types.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class));
addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
index cd66414..d1fa821 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GenerateUtils.generateRecordStatement
-import org.apache.flink.table.dataformat.GenericRow
+import org.apache.flink.table.dataformat.{DataFormatConverters, GenericRow}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.runtime.TableStreamOperator
import org.apache.flink.table.runtime.util.collections._
@@ -433,22 +433,21 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable local timestamp to the beginning of the SAM of the generated class.
- */
- def addReusableLocalTimestamp(): String = {
- addReusableTimestamp()
- }
-
- /**
- * Adds a reusable time to the beginning of the SAM of the generated class.
+ * Adds a reusable time to the beginning of the SAM of the generated [[Function]].
*/
def addReusableTime(): String = {
val fieldTerm = s"time"
+
val timestamp = addReusableTimestamp()
+
+ // declaration
+ reusableMemberStatements.add(s"private int $fieldTerm;")
+
+ // assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
- |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
+ |$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
@@ -458,18 +457,42 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
+ * Adds a reusable local date time to the beginning of the SAM of the generated class.
+ */
+ def addReusableLocalDateTime(): String = {
+ val fieldTerm = s"localtimestamp"
+
+ val timestamp = addReusableTimestamp()
+
+ // declaration
+ reusableMemberStatements.add(s"private long $fieldTerm;")
+
+ // assignment
+ val field =
+ s"""
+ |$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp);
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
* Adds a reusable local time to the beginning of the SAM of the generated class.
*/
def addReusableLocalTime(): String = {
val fieldTerm = s"localtime"
- val timeZone = addReusableTimeZone()
- val localtimestamp = addReusableLocalTimestamp()
+
+ val localtimestamp = addReusableLocalDateTime()
+
+ // declaration
+ reusableMemberStatements.add(s"private int $fieldTerm;")
+
+ // assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
- s"""
- |final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp))
- | % ${DateTimeUtils.MILLIS_PER_DAY});
- |""".stripMargin
+ s"""
+ |$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
+ |""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}
@@ -479,15 +502,18 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableDate(): String = {
val fieldTerm = s"date"
+
val timestamp = addReusableTimestamp()
val time = addReusableTime()
- val timeZone = addReusableTimeZone()
+ // declaration
+ reusableMemberStatements.add(s"private int $fieldTerm;")
+
+ // assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
- |final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp))
- | / ${DateTimeUtils.MILLIS_PER_DAY});
+ |$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
index 70db90a..85ad1f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
@@ -369,14 +369,8 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
- // Hack
- // Currently, in RexLiteral/SqlLiteral(Calcite), TimestampString has no time zone.
- // TimeString, DateString TimestampString are treated as UTC time/(unix time)
- // when they are converted/formatted/validated
- // Here, we adjust millis before Calcite solve TimeZone perfectly
val millis = literalValue.asInstanceOf[Long]
- val adjustedValue = millis - ctx.tableConfig.getTimeZone.getOffset(millis)
- generateNonNullLiteral(literalType, adjustedValue.toString + "L", adjustedValue)
+ generateNonNullLiteral(literalType, millis + "L", millis)
case INTERVAL_YEAR_MONTH =>
val decimal = BigDecimal(literalValue.asInstanceOf[JBigDecimal])
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 3d0e800..ab5f801 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -219,6 +219,11 @@ object BuiltInMethods {
val TIMESTAMP_TO_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampToString",
+ classOf[Long], classOf[Int])
+
+ val TIMESTAMP_TO_STRING_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils],
+ "timestampToString",
classOf[Long], classOf[Int], classOf[TimeZone])
val STRING_TO_DATE_WITH_FORMAT = Types.lookupMethod(
@@ -232,24 +237,43 @@ object BuiltInMethods {
val NOW_OFFSET = Types.lookupMethod(
classOf[SqlDateTimeUtils], "now", classOf[Long])
- val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
+ val DATE_FORMAT_STRING_STRING_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
classOf[String], classOf[String], classOf[TimeZone])
- val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
+ val DATE_FORMAT_STIRNG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String], classOf[TimeZone])
- val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
+ val DATE_FORMAT_LONG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String], classOf[TimeZone])
+ val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
+ classOf[String], classOf[String])
+
+ val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String])
+
+ val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String])
+
val UNIX_TIMESTAMP_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"unixTimestamp",
classOf[String],
+ classOf[String])
+
+ val UNIX_TIMESTAMP_FORMAT_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils],
+ "unixTimestamp",
+ classOf[String],
classOf[String],
classOf[TimeZone])
val UNIX_TIMESTAMP_STR = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String])
+
+ val UNIX_TIMESTAMP_STR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String], classOf[TimeZone])
val UNIX_TIMESTAMP = Types.lookupMethod(
@@ -259,41 +283,77 @@ object BuiltInMethods {
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long])
val FROM_UNIXTIME_FORMAT = Types.lookupMethod(
- classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String])
val FROM_UNIXTIME = Types.lookupMethod(
- classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long])
val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod(
- classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double])
val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal])
+
+ val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
+
+ val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
+
+ val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
+
+ val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone])
- val DATEDIFF_T_S = Types.lookupMethod(
+ val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone])
- val DATEDIFF_S_S = Types.lookupMethod(
+ val DATEDIFF_S_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String], classOf[TimeZone])
- val DATEDIFF_S_T = Types.lookupMethod(
+ val DATEDIFF_S_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long], classOf[TimeZone])
- val DATEDIFF_T_T = Types.lookupMethod(
+ val DATEDIFF_T_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long], classOf[TimeZone])
- val DATE_SUB_S = Types.lookupMethod(
+ val DATEDIFF_T_S = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String])
+
+ val DATEDIFF_S_S = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String])
+
+ val DATEDIFF_S_T = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long])
+
+ val DATEDIFF_T_T = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long])
+
+ val DATE_SUB_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int], classOf[TimeZone])
- val DATE_SUB_T = Types.lookupMethod(
+ val DATE_SUB_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int], classOf[TimeZone])
- val DATE_ADD_S = Types.lookupMethod(
+ val DATE_SUB_S = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int])
+
+ val DATE_SUB_T = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int])
+
+ val DATE_ADD_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int], classOf[TimeZone])
- val DATE_ADD_T = Types.lookupMethod(
+ val DATE_ADD_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int], classOf[TimeZone])
+ val DATE_ADD_S = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int])
+
+ val DATE_ADD_T = Types.lookupMethod(
+ classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int])
+
val INT_TO_DATE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toDate",
@@ -317,11 +377,21 @@ object BuiltInMethods {
val STRING_TO_TIMESTAMP = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
- classOf[String], classOf[TimeZone])
+ classOf[String])
val STRING_TO_TIMESTAMP_WITH_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
+ classOf[String], classOf[String])
+
+ val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils],
+ "toTimestamp",
+ classOf[String], classOf[TimeZone])
+
+ val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
+ classOf[SqlDateTimeUtils],
+ "toTimestamp",
classOf[String], classOf[String], classOf[TimeZone])
val TIMESTAMP_TO_BIGINT = Types.lookupMethod(
@@ -329,7 +399,7 @@ object BuiltInMethods {
"fromTimestamp",
classOf[Long])
- val EXTRACT_FROM_TIMESTAMP = Types.lookupMethod(
+ val EXTRACT_FROM_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"extractFromTimestamp",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
@@ -349,12 +419,12 @@ object BuiltInMethods {
"extractYearMonth",
classOf[TimeUnitRange], classOf[Int])
- val TIMESTAMP_FLOOR = Types.lookupMethod(
+ val TIMESTAMP_FLOOR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampFloor",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
- val TIMESTAMP_CEIL = Types.lookupMethod(
+ val TIMESTAMP_CEIL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampCeil",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
index e4dd220..0017d99 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
@@ -38,7 +38,7 @@ class CurrentTimePointCallGen(local: Boolean) extends CallGenerator {
generateNonNullField(returnType, time)
case TIMESTAMP_WITHOUT_TIME_ZONE if local =>
- val timestamp = ctx.addReusableLocalTimestamp()
+ val timestamp = ctx.addReusableLocalDateTime()
generateNonNullField(returnType, timestamp)
case DATE =>
@@ -50,6 +50,7 @@ class CurrentTimePointCallGen(local: Boolean) extends CallGenerator {
generateNonNullField(returnType, time)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
+ // TODO CURRENT_TIMESTAMP should return TIMESTAMP WITH TIME ZONE
val timestamp = ctx.addReusableTimestamp()
generateNonNullField(returnType, timestamp)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
new file mode 100644
index 0000000..eeec6ef
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.GenerateUtils.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+
+import java.lang.reflect.Method
+
+class ExtractCallGen(method: Method)
+ extends MethodCallGen(method) {
+
+ override def generate(
+ ctx: CodeGeneratorContext,
+ operands: Seq[GeneratedExpression],
+ returnType: LogicalType): GeneratedExpression = {
+ val unit = getEnum(operands.head).asInstanceOf[TimeUnitRange].startUnit
+ val tpe = operands(1).resultType
+ unit match {
+ case TimeUnit.YEAR |
+ TimeUnit.MONTH |
+ TimeUnit.DAY |
+ TimeUnit.QUARTER |
+ TimeUnit.DOY |
+ TimeUnit.DOW |
+ TimeUnit.WEEK |
+ TimeUnit.CENTURY |
+ TimeUnit.MILLENNIUM =>
+ tpe.getTypeRoot match {
+ case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
+ return generateCallIfArgsNotNull(ctx, returnType, operands) {
+ (terms) =>
+ s"""
+ |${qualifyMethod(method)}(${terms.head},
+ | ${terms(1)} / ${TimeUnit.DAY.multiplier.intValue()})
+ |""".stripMargin
+ }
+
+ case LogicalTypeRoot.DATE =>
+ return super.generate(ctx, operands, returnType)
+
+ case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
+ throw new ValidationException("unit " + unit + " can not be applied to time variable")
+
+ case _ => // do nothing
+ }
+
+ case _ => // do nothing
+ }
+ generateCallIfArgsNotNull(ctx, returnType, operands) {
+ (terms) => {
+ val factor = getFactor(unit)
+ unit match {
+ case TimeUnit.QUARTER =>
+ s"""
+ |((${terms(1)} % $factor) - 1) / ${unit.multiplier.intValue()} + 1
+ |""".stripMargin
+ case _ =>
+ if (factor == 1) {
+ s"""
+ |${terms(1)} / ${unit.multiplier.intValue()}
+ |""".stripMargin
+ } else {
+ s"""
+ |(${terms(1)} % $factor) / ${unit.multiplier.intValue()}
+ |""".stripMargin
+ }
+ }
+ }
+ }
+ }
+
+ private def getFactor(unit: TimeUnit): Long = {
+ unit match {
+ case TimeUnit.DAY =>
+ 1L
+ case TimeUnit.HOUR =>
+ TimeUnit.DAY.multiplier.longValue()
+ case TimeUnit.MINUTE =>
+ TimeUnit.HOUR.multiplier.longValue()
+ case TimeUnit.SECOND =>
+ TimeUnit.MINUTE.multiplier.longValue()
+ case TimeUnit.MONTH =>
+ TimeUnit.YEAR.multiplier.longValue()
+ case TimeUnit.QUARTER =>
+ TimeUnit.YEAR.multiplier.longValue()
+ case TimeUnit.YEAR |
+ TimeUnit.CENTURY |
+ TimeUnit.MILLENNIUM => 1L
+ case _ =>
+ throw new CodeGenException(s"Unit '$unit' is not supported.")
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index f06cbf1..a3be90a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -364,35 +364,35 @@ object FunctionGenerator {
// Temporal functions
// ----------------------------------------------------------------------------------------------
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, BIGINT),
- BuiltInMethod.UNIX_DATE_EXTRACT.method)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, DATE),
- BuiltInMethod.UNIX_DATE_EXTRACT.method)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, TIME_WITHOUT_TIME_ZONE),
- BuiltInMethods.UNIX_TIME_EXTRACT)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, TIMESTAMP_WITHOUT_TIME_ZONE),
- BuiltInMethods.EXTRACT_FROM_TIMESTAMP)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, INTERVAL_DAY_TIME),
- BuiltInMethods.EXTRACT_FROM_DATE)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
- addSqlFunctionMethod(
+ addSqlFunction(
EXTRACT,
Seq(ANY, INTERVAL_YEAR_MONTH),
- BuiltInMethods.EXTRACT_YEAR_MONTH)
+ new ExtractCallGen(BuiltInMethod.UNIX_DATE_EXTRACT.method))
addSqlFunction(
TIMESTAMP_DIFF,
@@ -436,7 +436,7 @@ object FunctionGenerator {
Seq(TIMESTAMP_WITHOUT_TIME_ZONE, ANY),
new FloorCeilCallGen(
BuiltInMethod.FLOOR.method,
- Some(BuiltInMethods.TIMESTAMP_FLOOR)))
+ Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))
addSqlFunction(
CEIL,
@@ -457,7 +457,7 @@ object FunctionGenerator {
Seq(TIMESTAMP_WITHOUT_TIME_ZONE, ANY),
new FloorCeilCallGen(
BuiltInMethod.CEIL.method,
- Some(BuiltInMethods.TIMESTAMP_CEIL)))
+ Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
addSqlFunction(
CURRENT_DATE,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
index 4e8f282..aa89d09 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
@@ -775,8 +775,7 @@ object ScalarOperatorGens {
case (_, VARCHAR | CHAR) if TypeCheckUtils.isTimePoint(operand.resultType) =>
generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) {
operandTerm =>
- val zoneTerm = ctx.addReusableTimeZone()
- s"${internalToStringCode(operand.resultType, operandTerm.head, zoneTerm)}"
+ s"${localTimeToStringCode(operand.resultType, operandTerm.head)}"
}
// Interval Months -> String
@@ -899,9 +898,9 @@ object ScalarOperatorGens {
operand,
resultNullable = true) {
operandTerm =>
- val zoneTerm = ctx.addReusableTimeZone()
- s"""${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString(),
- | $zoneTerm)""".stripMargin
+ s"""
+ |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
+ """.stripMargin
}
// String -> binary
@@ -1656,8 +1655,7 @@ object ScalarOperatorGens {
val typeTerm = primitiveTypeTermForType(expectType)
val defaultTerm = primitiveDefaultValue(expectType)
val term = newName("stringToTime")
- val zoneTerm = ctx.addReusableTimeZone()
- val code = stringToInternalCode(expectType, rightTerm, zoneTerm)
+ val code = stringToLocalTimeCode(expectType, rightTerm)
val stmt = s"$typeTerm $term = ${stringLiteral.nullTerm} ? $defaultTerm : $code;"
ctx.addReusableMember(stmt)
stringLiteral.copy(resultType = expectType, resultTerm = term)
@@ -2087,32 +2085,31 @@ object ScalarOperatorGens {
}
}
- private def stringToInternalCode(
+ private def stringToLocalTimeCode(
targetType: LogicalType,
- operandTerm: String,
- zoneTerm: String): String =
+ operandTerm: String): String =
targetType.getTypeRoot match {
case DATE =>
s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm.toString())"
case TIME_WITHOUT_TIME_ZONE =>
s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm.toString())"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
- s"""${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString(),
- | $zoneTerm)""".stripMargin
+ s"""
+ |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
+ |""".stripMargin
case _ => throw new UnsupportedOperationException
}
- private def internalToStringCode(
+ private def localTimeToStringCode(
fromType: LogicalType,
- operandTerm: String,
- zoneTerm: String): String =
+ operandTerm: String): String =
fromType.getTypeRoot match {
case DATE =>
s"${qualifyMethod(BuiltInMethod.UNIX_DATE_TO_STRING.method)}($operandTerm)"
case TIME_WITHOUT_TIME_ZONE =>
s"${qualifyMethod(BuiltInMethods.UNIX_TIME_TO_STRING)}($operandTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE => // including rowtime indicator
- s"${qualifyMethod(BuiltInMethods.TIMESTAMP_TO_STRING)}($operandTerm, 3, $zoneTerm)"
+ s"${qualifyMethod(BuiltInMethods.TIMESTAMP_TO_STRING)}($operandTerm, 3)"
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 458016f..cd04586 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -17,10 +17,11 @@
*/
package org.apache.flink.table.expressions
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalTime => jLocalTime, LocalDateTime}
import java.util.{Calendar, TimeZone}
object Literal {
@@ -41,6 +42,9 @@ object Literal {
case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
+ case localDate: LocalDate => Literal(localDate, LocalTimeTypeInfo.LOCAL_DATE)
+ case localTime: jLocalTime => Literal(localTime, LocalTimeTypeInfo.LOCAL_TIME)
+ case localDateTime: LocalDateTime => Literal(localDateTime, LocalTimeTypeInfo.LOCAL_DATE_TIME)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index 5fe1d0d..d9de5b1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -304,16 +304,15 @@ class StreamExecGroupWindowAggregate(
val builder = WindowOperatorBuilder
.builder()
.withInputFields(inputFields.toArray)
- val timeZoneOffset = -config.getTimeZone.getOffset(Calendar.ZONE_OFFSET)
val newBuilder = window match {
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
- builder.tumble(toDuration(size), timeZoneOffset).withProcessingTime()
+ builder.tumble(toDuration(size)).withProcessingTime()
case TumblingGroupWindow(_, timeField, size)
if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
- builder.tumble(toDuration(size), timeZoneOffset).withEventTime(timeIdx)
+ builder.tumble(toDuration(size)).withEventTime(timeIdx)
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>
@@ -328,12 +327,12 @@ class StreamExecGroupWindowAggregate(
case SlidingGroupWindow(_, timeField, size, slide)
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
- builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
+ builder.sliding(toDuration(size), toDuration(slide))
.withProcessingTime()
case SlidingGroupWindow(_, timeField, size, slide)
if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
- builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
+ builder.sliding(toDuration(size), toDuration(slide))
.withEventTime(timeIdx)
case SlidingGroupWindow(_, timeField, size, slide)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
index bbea8a8..660401d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
@@ -21,22 +21,20 @@ package org.apache.flink.table.plan.util
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.catalog.{FunctionCatalog, FunctionLookup}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.dataformat.DataFormatConverters.{LocalDateConverter, LocalDateTimeConverter, LocalTimeConverter}
import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{AND, CAST, OR}
import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.util.Logging
import org.apache.flink.util.Preconditions
-import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
-import org.apache.calcite.util.{DateString, TimeString, TimestampString}
-import java.sql.{Date, Time, Timestamp}
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -230,16 +228,16 @@ class RexNodeToExpressionConverter(
val literalValue = literalType.getTypeRoot match {
case DATE =>
- val v = literal.getValueAs(classOf[DateString])
- new Date(DateTimeUtils.dateStringToUnixDate(v.toString) * DateTimeUtils.MILLIS_PER_DAY)
+ val v = literal.getValueAs(classOf[Integer])
+ LocalDateConverter.INSTANCE.toExternal(v)
case TIME_WITHOUT_TIME_ZONE =>
- val v = literal.getValueAs(classOf[TimeString])
- new Time(DateTimeUtils.timeStringToUnixDate(v.toString(0)).longValue())
+ val v = literal.getValueAs(classOf[Integer])
+ LocalTimeConverter.INSTANCE.toExternal(v)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
- val v = literal.getValueAs(classOf[TimestampString])
- new Timestamp(DateTimeUtils.timestampStringToUnixDate(v.toString(3)))
+ val v = literal.getValueAs(classOf[java.lang.Long])
+ LocalDateTimeConverter.INSTANCE.toExternal(v)
case TINYINT =>
// convert from BigDecimal to Byte
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 85113f0..6b39883 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -18,15 +18,16 @@
package org.apache.flink.table.sources.tsextractors
-import java.util
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, TypeInformation, Types}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import java.util
+
/**
* Converts an existing [[Long]], [[java.sql.Timestamp]], or
* timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into
@@ -45,6 +46,7 @@ final class ExistingField(val field: String) extends TimestampExtractor {
fieldType match {
case Types.LONG => // OK
case Types.SQL_TIMESTAMP => // OK
+ case Types.LOCAL_DATE_TIME => // OK
case Types.STRING => // OK
case _: TypeInformation[_] =>
throw new ValidationException(
@@ -59,7 +61,7 @@ final class ExistingField(val field: String) extends TimestampExtractor {
*/
override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0)
- .asInstanceOf[ExestingFieldFieldReference]
+ .asInstanceOf[ExestingFieldFieldReference]
val fieldReferenceExpr = new FieldReferenceExpression(
fieldAccess.name,
@@ -80,7 +82,7 @@ final class ExistingField(val field: String) extends TimestampExtractor {
innerDiv,
typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP)))
- case Types.SQL_TIMESTAMP =>
+ case Types.SQL_TIMESTAMP | LocalTimeTypeInfo.LOCAL_DATE_TIME =>
fieldReferenceExpr
case Types.STRING =>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
index f183bcb..afce4f1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
@@ -224,7 +224,7 @@ LogicalProject(id=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[id])
-+- TableSourceScan(table=[[default_catalog, default_database, FilterableTable1, source: [filter=[and(and(greaterThan(tv, 14:25:02), greaterThan(dv, 2017-02-03)), greaterThan(tsv, 2017-02-03 14:25:02.0))]]]], fields=[id, dv, tv, tsv])
++- TableSourceScan(table=[[default_catalog, default_database, FilterableTable1, source: [filter=[and(and(greaterThan(tv, 14:25:02), greaterThan(dv, 2017-02-03)), greaterThan(tsv, 2017-02-03T14:25:02))]]]], fields=[id, dv, tv, tsv])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
index ce0243c..9aadba0 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
@@ -337,7 +337,7 @@ LogicalProject(id=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[id])
-+- TableSourceScan(table=[[default_catalog, default_database, FilterableTable1, source: [filter=[and(and(greaterThan(tv, 14:25:02), greaterThan(dv, 2017-02-03)), greaterThan(tsv, 2017-02-03 14:25:02.0))]]]], fields=[id, dv, tv, tsv])
++- TableSourceScan(table=[[default_catalog, default_database, FilterableTable1, source: [filter=[and(and(greaterThan(tv, 14:25:02), greaterThan(dv, 2017-02-03)), greaterThan(tsv, 2017-02-03T14:25:02))]]]], fields=[id, dv, tv, tsv])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index 7460c48..4c9a105 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -24,7 +24,8 @@ import org.apache.flink.table.expressions.utils.ExpressionTestBase
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.util.DateTimeTestUtil._
import org.apache.flink.types.Row
-import org.junit.Test
+
+import org.junit.{Ignore, Test}
import java.sql.Timestamp
import java.text.SimpleDateFormat
@@ -399,6 +400,7 @@ class TemporalTypesTest extends ExpressionTestBase {
)
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testDateAndTime(): Unit = {
val zones = Seq (
@@ -445,6 +447,7 @@ class TemporalTypesTest extends ExpressionTestBase {
}
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testTemporalShanghai(): Unit = {
config.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
@@ -577,6 +580,7 @@ class TemporalTypesTest extends ExpressionTestBase {
//testSqlApi("CURRENT_TIME", "")
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testUTCTimeZone(): Unit = {
config.setTimeZone(TimeZone.getTimeZone("UTC"))
@@ -626,6 +630,7 @@ class TemporalTypesTest extends ExpressionTestBase {
"2018-03-13 17:02:03")
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testDaylightSavingTimeZone(): Unit = {
config.setTimeZone(TimeZone.getTimeZone("America/New_York"))
@@ -685,6 +690,7 @@ class TemporalTypesTest extends ExpressionTestBase {
//testSqlApi("PROCTIME()", ldt.toString)
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testHourUnitRangoonTimeZone(): Unit = {
// Asia/Rangoon UTC Offset 6.5
@@ -721,6 +727,7 @@ class TemporalTypesTest extends ExpressionTestBase {
}
+ @Ignore // TODO support timestamp with local time zone
@Test
def testTimeZoneFunction(): Unit = {
config.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
@@ -755,13 +762,13 @@ class TemporalTypesTest extends ExpressionTestBase {
override def testData: Row = {
val testData = new Row(16)
- testData.setField(0, UTCDate("1990-10-14"))
- testData.setField(1, UTCTime("10:20:45"))
- testData.setField(2, UTCTimestamp("1990-10-14 10:20:45.123"))
- testData.setField(3, UTCDate("1990-10-13"))
- testData.setField(4, UTCDate("1990-10-15"))
- testData.setField(5, UTCTime("00:00:00"))
- testData.setField(6, UTCTimestamp("1990-10-14 00:00:00.0"))
+ testData.setField(0, localDate("1990-10-14"))
+ testData.setField(1, localTime("10:20:45"))
+ testData.setField(2, localDateTime("1990-10-14 10:20:45.123"))
+ testData.setField(3, localDate("1990-10-13"))
+ testData.setField(4, localDate("1990-10-15"))
+ testData.setField(5, localTime("00:00:00"))
+ testData.setField(6, localDateTime("1990-10-14 00:00:00.0"))
testData.setField(7, 12000)
testData.setField(8, 1467012213000L)
testData.setField(9, 24)
@@ -778,20 +785,20 @@ class TemporalTypesTest extends ExpressionTestBase {
override def typeInfo: RowTypeInfo = {
new RowTypeInfo(
- /* 0 */ Types.SQL_DATE,
- /* 1 */ Types.SQL_TIME,
- /* 2 */ Types.SQL_TIMESTAMP,
- /* 3 */ Types.SQL_DATE,
- /* 4 */ Types.SQL_DATE,
- /* 5 */ Types.SQL_TIME,
- /* 6 */ Types.SQL_TIMESTAMP,
+ /* 0 */ Types.LOCAL_DATE,
+ /* 1 */ Types.LOCAL_TIME,
+ /* 2 */ Types.LOCAL_DATE_TIME,
+ /* 3 */ Types.LOCAL_DATE,
+ /* 4 */ Types.LOCAL_DATE,
+ /* 5 */ Types.LOCAL_TIME,
+ /* 6 */ Types.LOCAL_DATE_TIME,
/* 7 */ Types.INT,
/* 8 */ Types.LONG,
/* 9 */ TimeIntervalTypeInfo.INTERVAL_MONTHS,
/* 10 */ TimeIntervalTypeInfo.INTERVAL_MILLIS,
- /* 11 */ Types.SQL_DATE,
- /* 12 */ Types.SQL_TIME,
- /* 13 */ Types.SQL_TIMESTAMP,
+ /* 11 */ Types.LOCAL_DATE,
+ /* 12 */ Types.LOCAL_TIME,
+ /* 13 */ Types.LOCAL_DATE_TIME,
/* 14 */ Types.STRING,
/* 15 */ Types.LONG)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ArrayTypeTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ArrayTypeTestBase.scala
index bafe377..dc9fa4f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ArrayTypeTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ArrayTypeTestBase.scala
@@ -32,7 +32,7 @@ abstract class ArrayTypeTestBase extends ExpressionTestBase {
testData.setField(0, null)
testData.setField(1, 42)
testData.setField(2, Array(1, 2, 3))
- testData.setField(3, Array(UTCDate("1984-03-12"), UTCDate("1984-02-10")))
+ testData.setField(3, Array(localDate("1984-03-12"), localDate("1984-02-10")))
testData.setField(4, null)
testData.setField(5, Array(Array(1, 2, 3), null))
testData.setField(6, Array[Integer](1, null, null, 4))
@@ -49,7 +49,7 @@ abstract class ArrayTypeTestBase extends ExpressionTestBase {
/* 0 */ Types.INT,
/* 1 */ Types.INT,
/* 2 */ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
- /* 3 */ ObjectArrayTypeInfo.getInfoFor(Types.SQL_DATE),
+ /* 3 */ ObjectArrayTypeInfo.getInfoFor(Types.LOCAL_DATE),
/* 4 */ ObjectArrayTypeInfo.getInfoFor(ObjectArrayTypeInfo.getInfoFor(Types.INT)),
/* 5 */ ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO),
/* 6 */ ObjectArrayTypeInfo.getInfoFor(Types.INT),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
index 78b3b46..f358499 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.dataformat.Decimal
import org.apache.flink.table.typeutils.DecimalTypeInfo
-import org.apache.flink.table.util.DateTimeTestUtil.UTCDate
+import org.apache.flink.table.util.DateTimeTestUtil.localDate
import org.apache.flink.types.Row
abstract class RowTypeTestBase extends ExpressionTestBase {
@@ -36,7 +36,7 @@ abstract class RowTypeTestBase extends ExpressionTestBase {
nestedRow.setField(0, 3)
nestedRow.setField(1, row)
val specialTypeRow = new Row(3)
- specialTypeRow.setField(0, UTCDate("1984-03-12"))
+ specialTypeRow.setField(0, localDate("1984-03-12"))
specialTypeRow.setField(1, Decimal.castFrom("0.00000000", 9, 8))
specialTypeRow.setField(2, Array[java.lang.Integer](1, 2, 3))
val testData = new Row(7)
@@ -57,7 +57,7 @@ abstract class RowTypeTestBase extends ExpressionTestBase {
/* 2 */ Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN),
/* 3 */ Types.ROW(Types.INT, Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN)),
/* 4 */ Types.ROW(
- Types.SQL_DATE,
+ Types.LOCAL_DATE,
DecimalTypeInfo.of(9, 8),
ObjectArrayTypeInfo.getInfoFor(Types.INT)),
/* 5 */ Types.ROW(Types.STRING, Types.BOOLEAN),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 709f7f6..78b067b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -44,7 +44,7 @@ abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
testData.setField(12, null)
testData.setField(13, Row.of("foo", null))
testData.setField(14, null)
- testData.setField(15, UTCDate("1996-11-10"))
+ testData.setField(15, localDate("1996-11-10"))
testData.setField(16, Decimal.castFrom("0.00000000", 19, 8))
testData.setField(17, Decimal.castFrom("10.0", 19, 1))
testData
@@ -67,7 +67,7 @@ abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
/* 12 */ Types.BOOLEAN,
/* 13 */ Types.ROW(Types.STRING, Types.STRING),
/* 14 */ Types.STRING,
- /* 15 */ Types.SQL_DATE,
+ /* 15 */ Types.LOCAL_DATE,
/* 16 */ DecimalTypeInfo.of(19, 8),
/* 17 */ DecimalTypeInfo.of(19, 1)
)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
index d292f0a..0b80ee6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -45,9 +45,9 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(13, -4.6)
testData.setField(14, -3)
testData.setField(15, Decimal.castFrom("-1231.1231231321321321111", 38, 19))
- testData.setField(16, UTCDate("1996-11-10"))
- testData.setField(17, UTCTime("06:55:44"))
- testData.setField(18, UTCTimestamp("1996-11-10 06:55:44.333"))
+ testData.setField(16, localDate("1996-11-10"))
+ testData.setField(17, localTime("06:55:44"))
+ testData.setField(18, localDateTime("1996-11-10 06:55:44.333"))
testData.setField(19, 1467012213000L) // +16979 07:23:33.000
testData.setField(20, 25) // +2-01
testData.setField(21, null)
@@ -74,14 +74,14 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(42, 256.toLong)
testData.setField(43, -1.toLong)
testData.setField(44, 256)
- testData.setField(45, UTCTimestamp("1996-11-10 06:55:44.333").toString)
+ testData.setField(45, localDateTime("1996-11-10 06:55:44.333").toString)
testData.setField(46, "test1=1,test2=2,test3=3")
testData.setField(47, null)
testData.setField(48, false)
testData.setField(49, Decimal.castFrom("1345.1231231321321321111", 38, 19))
- testData.setField(50, UTCDate("1997-11-11"))
- testData.setField(51, UTCTime("09:44:55"))
- testData.setField(52, UTCTimestamp("1997-11-11 09:44:55.333"))
+ testData.setField(50, localDate("1997-11-11"))
+ testData.setField(51, localTime("09:44:55"))
+ testData.setField(52, localDateTime("1997-11-11 09:44:55.333"))
testData.setField(53, "hello world".getBytes)
testData.setField(54, "This is a testing string.".getBytes)
testData
@@ -105,9 +105,9 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
/* 13 */ Types.DOUBLE,
/* 14 */ Types.INT,
/* 15 */ DecimalTypeInfo.of(38, 19),
- /* 16 */ Types.SQL_DATE,
- /* 17 */ Types.SQL_TIME,
- /* 18 */ Types.SQL_TIMESTAMP,
+ /* 16 */ Types.LOCAL_DATE,
+ /* 17 */ Types.LOCAL_TIME,
+ /* 18 */ Types.LOCAL_DATE_TIME,
/* 19 */ TimeIntervalTypeInfo.INTERVAL_MILLIS,
/* 20 */ TimeIntervalTypeInfo.INTERVAL_MONTHS,
/* 21 */ Types.BOOLEAN,
@@ -139,9 +139,9 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
/* 47 */ Types.STRING,
/* 48 */ Types.BOOLEAN,
/* 49 */ DecimalTypeInfo.of(38, 19),
- /* 50 */ Types.SQL_DATE,
- /* 51 */ Types.SQL_TIME,
- /* 52 */ Types.SQL_TIMESTAMP,
+ /* 50 */ Types.LOCAL_DATE,
+ /* 51 */ Types.LOCAL_TIME,
+ /* 52 */ Types.LOCAL_DATE_TIME,
/* 53 */ Types.PRIMITIVE_ARRAY(Types.BYTE),
/* 54 */ Types.PRIMITIVE_ARRAY(Types.BYTE))
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
index c4fd9fe..96427e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.plan.batch.sql
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
import org.apache.flink.table.expressions.utils.Func1
@@ -28,7 +28,6 @@ import org.apache.flink.types.Row
import org.junit.{Before, Test}
-import java.sql.{Date, Time, Timestamp}
import java.util.TimeZone
class TableSourceTest extends TableTestBase {
@@ -155,18 +154,18 @@ class TableSourceTest extends TableTestBase {
val rowTypeInfo = new RowTypeInfo(
Array[TypeInformation[_]](
BasicTypeInfo.INT_TYPE_INFO,
- SqlTimeTypeInfo.DATE,
- SqlTimeTypeInfo.TIME,
- SqlTimeTypeInfo.TIMESTAMP
+ LocalTimeTypeInfo.LOCAL_DATE,
+ LocalTimeTypeInfo.LOCAL_TIME,
+ LocalTimeTypeInfo.LOCAL_DATE_TIME
),
Array("id", "dv", "tv", "tsv")
)
val row = new Row(4)
row.setField(0, 1)
- row.setField(1, Date.valueOf("2017-01-23"))
- row.setField(2, Time.valueOf("14:23:02"))
- row.setField(3, Timestamp.valueOf("2017-01-24 12:45:01.234"))
+ row.setField(1, DateTimeTestUtil.localDate("2017-01-23"))
+ row.setField(2, DateTimeTestUtil.localTime("14:23:02"))
+ row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
val tableSource = TestFilterableTableSource(
isBatch = true, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/AggregateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/AggregateTestBase.scala
index 3a206ec..0b8c8ed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/AggregateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/AggregateTestBase.scala
@@ -33,7 +33,7 @@ abstract class AggregateTestBase extends TableTestBase {
util.addTableSource("MyTable",
Array[TypeInformation[_]](
Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, Types.BOOLEAN,
- Types.STRING, Types.SQL_DATE, Types.SQL_TIME, Types.SQL_TIMESTAMP,
+ Types.STRING, Types.LOCAL_DATE, Types.LOCAL_TIME, Types.LOCAL_DATE_TIME,
DecimalTypeInfo.of(30, 20), DecimalTypeInfo.of(10, 5)),
Array("byte", "short", "int", "long", "float", "double", "boolean",
"string", "date", "time", "timestamp", "decimal3020", "decimal105"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/AggregateReduceGroupingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/AggregateReduceGroupingTestBase.scala
index 687f730..128ed21 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/AggregateReduceGroupingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/AggregateReduceGroupingTestBase.scala
@@ -58,7 +58,7 @@ abstract class AggregateReduceGroupingTestBase extends TableTestBase {
.build()
)
util.addTableSource("T4",
- Array[TypeInformation[_]](Types.INT, Types.INT, Types.STRING, Types.SQL_TIMESTAMP),
+ Array[TypeInformation[_]](Types.INT, Types.INT, Types.STRING, Types.LOCAL_DATE_TIME),
Array("a4", "b4", "c4", "d4"),
FlinkStatistic.builder()
.tableStats(new TableStats(100000000))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
index 9dbaaf0..abeb9c5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
@@ -27,7 +27,6 @@ import org.apache.flink.types.Row
import org.junit.{Before, Test}
-import java.sql.{Date, Time, Timestamp}
import java.util.TimeZone
class TableSourceTest extends TableTestBase {
@@ -44,7 +43,7 @@ class TableSourceTest extends TableTestBase {
def testTableSourceWithLongRowTimeField(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -61,9 +60,9 @@ class TableSourceTest extends TableTestBase {
def testTableSourceWithTimestampRowTimeField(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING))
val returnType = new RowTypeInfo(
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
Array("id", "rowtime", "val", "name"))
@@ -78,9 +77,9 @@ class TableSourceTest extends TableTestBase {
def testRowTimeTableSourceGroupWindow(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING))
val returnType = new RowTypeInfo(
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
Array("id", "rowtime", "val", "name"))
@@ -104,7 +103,7 @@ class TableSourceTest extends TableTestBase {
def testProcTimeTableSourceSimple(): Unit = {
val tableSchema = new TableSchema(
Array("id", "pTime", "val", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
Array("id", "val", "name"))
@@ -120,7 +119,7 @@ class TableSourceTest extends TableTestBase {
def testProjectWithRowtimeProctime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -137,7 +136,7 @@ class TableSourceTest extends TableTestBase {
def testProjectWithoutRowtime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -153,7 +152,7 @@ class TableSourceTest extends TableTestBase {
def testProjectWithoutProctime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -170,7 +169,7 @@ class TableSourceTest extends TableTestBase {
def testProjectOnlyProctime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -186,7 +185,7 @@ class TableSourceTest extends TableTestBase {
def testProjectOnlyRowtime(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -203,7 +202,7 @@ class TableSourceTest extends TableTestBase {
def testProjectWithMapping(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -334,9 +333,9 @@ class TableSourceTest extends TableTestBase {
val row = new Row(4)
row.setField(0, 1)
- row.setField(1, Date.valueOf("2017-01-23"))
- row.setField(2, Time.valueOf("14:23:02"))
- row.setField(3, Timestamp.valueOf("2017-01-24 12:45:01.234"))
+ row.setField(1, DateTimeTestUtil.localDate("2017-01-23"))
+ row.setField(2, DateTimeTestUtil.localTime("14:23:02"))
+ row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
val tableSource = TestFilterableTableSource(
isBatch = false, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
index cbc82c0..a4db880 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
@@ -38,7 +38,7 @@ class AggregateTest extends TableTestBase {
util.addTableSource("MyTable1",
Array[TypeInformation[_]](
Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, Types.BOOLEAN,
- Types.STRING, Types.SQL_DATE, Types.SQL_TIME, Types.SQL_TIMESTAMP,
+ Types.STRING, Types.LOCAL_DATE, Types.LOCAL_TIME, Types.LOCAL_DATE_TIME,
DecimalTypeInfo.of(30, 20), DecimalTypeInfo.of(10, 5)),
Array("byte", "short", "int", "long", "float", "double", "boolean",
"string", "date", "time", "timestamp", "decimal3020", "decimal105"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 880ef99..c3a9e2d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -36,6 +36,7 @@ import org.junit.Test
import _root_.java.lang.{Long => JLong}
import _root_.java.sql.Timestamp
+import _root_.java.time.LocalDateTime
import _root_.java.util.concurrent.CompletableFuture
import _root_.java.util.{Collection => JCollection}
@@ -110,7 +111,7 @@ class LookupJoinTest extends TableTestBase with Serializable {
"SELECT * FROM T AS T JOIN temporalTable " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts",
"The TableSource [TestInvalidTemporalTable(id, name, age, ts)] " +
- "return type BaseRow(id: Integer, name: String, age: Integer, ts: Timestamp) " +
+ "return type BaseRow(id: INT, name: STRING, age: INT, ts: TIMESTAMP(3)) " +
"does not match its lookup function extracted return type String",
classOf[TableException]
)
@@ -122,7 +123,7 @@ class LookupJoinTest extends TableTestBase with Serializable {
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts",
"Expected: eval(java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, " +
"java.lang.Long) \n" +
- "Actual: eval(java.lang.Integer, java.lang.String, java.sql.Timestamp)",
+ "Actual: eval(java.lang.Integer, java.lang.String, java.time.LocalDateTime)",
classOf[TableException]
)
@@ -157,7 +158,7 @@ class LookupJoinTest extends TableTestBase with Serializable {
"Expected: eval(java.util.concurrent.CompletableFuture, " +
"java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long) \n" +
"Actual: eval(java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, " +
- "java.sql.Timestamp)",
+ "java.time.LocalDateTime)",
classOf[TableException]
)
@@ -169,7 +170,7 @@ class LookupJoinTest extends TableTestBase with Serializable {
"Expected: eval(java.util.concurrent.CompletableFuture, " +
"java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long) \n" +
"Actual: eval(java.util.concurrent.CompletableFuture, " +
- "java.lang.Integer, java.lang.String, java.sql.Timestamp)",
+ "java.lang.Integer, java.lang.String, java.time.LocalDateTime)",
classOf[TableException]
)
@@ -381,7 +382,7 @@ class TestInvalidTemporalTable private(
val fieldNames: Array[String] = Array("id", "name", "age", "ts")
val fieldTypes: Array[TypeInformation[_]] = Array(
- Types.INT, Types.STRING, Types.INT, Types.SQL_TIMESTAMP)
+ Types.INT, Types.STRING, Types.INT, Types.LOCAL_DATE_TIME)
def this(fetcher: TableFunction[_]) {
this(false, fetcher, null)
@@ -418,7 +419,7 @@ class InvalidTableFunctionResultType extends TableFunction[String] {
}
class InvalidTableFunctionEvalSignature1 extends TableFunction[BaseRow] {
- def eval(a: Integer, b: String, c: Timestamp): Unit = {
+ def eval(a: Integer, b: String, c: LocalDateTime): Unit = {
}
}
@@ -429,7 +430,7 @@ class ValidTableFunction extends TableFunction[BaseRow] {
}
class ValidTableFunction2 extends TableFunction[Row] {
- def eval(a: Integer, b: String, c: Timestamp): Unit = {
+ def eval(a: Integer, b: String, c: LocalDateTime): Unit = {
}
}
@@ -440,13 +441,13 @@ class InvalidAsyncTableFunctionResultType extends AsyncTableFunction[Row] {
}
class InvalidAsyncTableFunctionEvalSignature1 extends AsyncTableFunction[BaseRow] {
- def eval(a: Integer, b: BinaryString, c: Timestamp): Unit = {
+ def eval(a: Integer, b: BinaryString, c: LocalDateTime): Unit = {
}
}
class InvalidAsyncTableFunctionEvalSignature2 extends AsyncTableFunction[BaseRow] {
def eval(resultFuture: CompletableFuture[JCollection[BaseRow]],
- a: Integer, b: String, c: Timestamp): Unit = {
+ a: Integer, b: String, c: LocalDateTime): Unit = {
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index 79311e2..9c303d2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -28,9 +28,8 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, GREA
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.functions.utils.ScalarSqlFunction
import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
-import org.apache.flink.table.util.IntSumAggFunction
+import org.apache.flink.table.util.{DateTimeTestUtil, IntSumAggFunction}
-import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
@@ -432,10 +431,9 @@ class RexNodeExtractorTest extends RexNodeTestBase {
relBuilder,
functionCatalog)
- val timestamp = new Timestamp(DateTimeUtils.timestampStringToUnixDate("2017-09-10 14:23:01"))
- val date = new Date(
- DateTimeUtils.dateStringToUnixDate("2017-09-12") * DateTimeUtils.MILLIS_PER_DAY)
- val time = new Time(DateTimeUtils.timeStringToUnixDate("14:23:01").longValue())
+ val timestamp = DateTimeTestUtil.localDateTime("2017-09-10 14:23:01")
+ val date = DateTimeTestUtil.localDate("2017-09-12")
+ val time = DateTimeTestUtil.localTime("14:23:01")
{
val expected = Array[Expression](
@@ -454,7 +452,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val expected = Array[Expression](
EqualTo(
UnresolvedFieldReference("timestamp_col"),
- Literal(timestamp)
+ Literal(Timestamp.valueOf("2017-09-10 14:23:01"))
),
EqualTo(
UnresolvedFieldReference("date_col"),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 0e8690d..14010f5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -19,26 +19,32 @@
package org.apache.flink.table.runtime.batch.sql
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.{LOCAL_DATE, LOCAL_DATE_TIME, LOCAL_TIME}
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.{DATE, TIME, TIMESTAMP}
+import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{ExecutionConfigOptions, ValidationException}
-import org.apache.flink.table.dataformat.DataFormatConverters.{DateConverter, TimestampConverter}
+import org.apache.flink.table.dataformat.DataFormatConverters.{LocalDateConverter, LocalDateTimeConverter}
import org.apache.flink.table.dataformat.Decimal
import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortRule
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
+import org.apache.flink.table.runtime.utils.BatchTableEnvUtil.parseFieldNames
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.runtime.utils.TestData._
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, BatchTestBase, UserDefinedFunctionTestUtils}
+import org.apache.flink.table.util.DateTimeTestUtil
import org.apache.flink.table.util.DateTimeTestUtil._
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit._
-import java.sql.{Date, Timestamp}
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime}
import java.util
import scala.collection.Seq
@@ -241,23 +247,23 @@ class CalcITCase extends BatchTestBase {
def testAdvancedDataTypes(): Unit = {
val data = Seq(
row(
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24")))
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24")))
registerCollection(
- "MyTable", data, new RowTypeInfo(DATE, TIME, TIMESTAMP), "a, b, c")
+ "MyTable", data, new RowTypeInfo(LOCAL_DATE, LOCAL_TIME, LOCAL_DATE_TIME), "a, b, c")
checkResult(
"SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
"TIMESTAMP '1984-07-12 14:34:24' FROM MyTable",
Seq(
row(
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"),
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"))))
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"),
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"))))
checkResult(
"SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
@@ -265,12 +271,12 @@ class CalcITCase extends BatchTestBase {
"WHERE a = '1984-07-12' and b = '14:34:24' and c = '1984-07-12 14:34:24'",
Seq(
row(
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"),
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"))))
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"),
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"))))
checkResult(
"SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
@@ -278,12 +284,12 @@ class CalcITCase extends BatchTestBase {
"WHERE '1984-07-12' = a and '14:34:24' = b and '1984-07-12 14:34:24' = c",
Seq(
row(
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"),
- UTCDate("1984-07-12"),
- UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"))))
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"),
+ localDate("1984-07-12"),
+ localTime("14:34:24"),
+ localDateTime("1984-07-12 14:34:24"))))
}
@Test
@@ -312,10 +318,38 @@ class CalcITCase extends BatchTestBase {
@Test
def testTimeUDF(): Unit = {
- registerFunction("func", DateFunction)
- val data = Seq(row(UTCDate("1984-07-12")))
- registerCollection("MyTable", data, new RowTypeInfo(DATE), "a")
- checkResult("SELECT func(a) FROM MyTable", Seq(row(UTCDate("1984-07-12"))))
+ val data = Seq(row(
+ localDate("1984-07-12"),
+ Date.valueOf("1984-07-12"),
+ DateTimeTestUtil.localTime("08:03:09"),
+ Time.valueOf("08:03:09"),
+ localDateTime("2019-09-19 08:03:09"),
+ Timestamp.valueOf("2019-09-19 08:03:09")))
+ registerCollection("MyTable", data,
+ new RowTypeInfo(LOCAL_DATE, DATE, LOCAL_TIME, TIME, LOCAL_DATE_TIME, TIMESTAMP),
+ "a, b, c, d, e, f")
+
+ tEnv.registerFunction("dateFunc", DateFunction)
+ tEnv.registerFunction("localDateFunc", LocalDateFunction)
+ tEnv.registerFunction("timeFunc", TimeFunction)
+ tEnv.registerFunction("localTimeFunc", LocalTimeFunction)
+ tEnv.registerFunction("timestampFunc", TimestampFunction)
+ tEnv.registerFunction("datetimeFunc", DateTimeFunction)
+
+ val v1 = "1984-07-12"
+ val v2 = "08:03:09"
+ val v3 = "2019-09-19 08:03:09.0"
+ val v4 = "2019-09-19T08:03:09"
+ checkResult(
+ "SELECT" +
+ " dateFunc(a), localDateFunc(a), dateFunc(b), localDateFunc(b)," +
+ " timeFunc(c), localTimeFunc(c), timeFunc(d), localTimeFunc(d)," +
+ " timestampFunc(e), datetimeFunc(e), timestampFunc(f), datetimeFunc(f)" +
+ " FROM MyTable",
+ Seq(row(
+ v1, v1, v1, v1,
+ v2, v2, v2, v2,
+ v3, v4, v3, v4)))
}
@Test
@@ -660,9 +694,11 @@ class CalcITCase extends BatchTestBase {
@Test
def testValueConstructor(): Unit = {
- val data = Seq(row("foo", 12, UTCTimestamp("1984-07-12 14:34:24")))
- val tpe = new RowTypeInfo(STRING_TYPE_INFO, INT_TYPE_INFO, TIMESTAMP)
- registerCollection("MyTable", data, tpe, "a, b, c" , Array(false, false, false))
+ val data = Seq(row("foo", 12, localDateTime("1984-07-12 14:34:24")))
+ BatchTableEnvUtil.registerCollection(
+ tEnv, "MyTable", data,
+ new RowTypeInfo(Types.STRING, Types.INT, Types.LOCAL_DATE_TIME),
+ Some(parseFieldNames("a, b, c")), None, None)
val table = parseQuery("SELECT ROW(a, b, c), ARRAY[12, b], MAP[a, c] FROM MyTable " +
"WHERE (a, b, c) = ('foo', 12, TIMESTAMP '1984-07-12 14:34:24')")
@@ -940,12 +976,13 @@ class CalcITCase extends BatchTestBase {
checkResult("SELECT CURRENT_DATE = CURRENT_DATE FROM testTable WHERE a = TRUE",
Seq(row(true)))
- val d0 = DateConverter.INSTANCE.toInternal(new Date(System.currentTimeMillis()))
+ val d0 = LocalDateConverter.INSTANCE.toInternal(
+ unixTimestampToLocalDateTime(System.currentTimeMillis()).toLocalDate)
val table = parseQuery("SELECT CURRENT_DATE FROM testTable WHERE a = TRUE")
val result = executeQuery(table)
- val d1 = DateConverter.INSTANCE.toInternal(
- result.toList.head.getField(0).asInstanceOf[java.sql.Date])
+ val d1 = LocalDateConverter.INSTANCE.toInternal(
+ result.toList.head.getField(0).asInstanceOf[LocalDate])
Assert.assertTrue(d0 <= d1 && d1 - d0 <= 1)
}
@@ -961,8 +998,8 @@ class CalcITCase extends BatchTestBase {
val table = parseQuery("SELECT CURRENT_TIMESTAMP FROM testTable WHERE a = TRUE")
val result = executeQuery(table)
- val ts1 = TimestampConverter.INSTANCE.toInternal(
- result.toList.head.getField(0).asInstanceOf[java.sql.Timestamp])
+ val ts1 = LocalDateTimeConverter.INSTANCE.toInternal(
+ result.toList.head.getField(0).asInstanceOf[LocalDateTime])
val ts2 = System.currentTimeMillis()
@@ -1009,15 +1046,15 @@ class CalcITCase extends BatchTestBase {
def testTimestampCompareWithDateString(): Unit = {
//j 2015-05-20 10:00:00.887
checkResult("SELECT j FROM testTable WHERE j < '2017-11-11'",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"))))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"))))
}
@Test
def testDateCompareWithDateString(): Unit = {
checkResult("SELECT h FROM testTable WHERE h <= '2017-12-12'",
Seq(
- row(UTCDate("2017-12-12")),
- row(UTCDate("2017-12-12"))
+ row(localDate("2017-12-12")),
+ row(localDate("2017-12-12"))
))
}
@@ -1025,8 +1062,8 @@ class CalcITCase extends BatchTestBase {
def testDateEqualsWithDateString(): Unit = {
checkResult("SELECT h FROM testTable WHERE h = '2017-12-12'",
Seq(
- row(UTCDate("2017-12-12")),
- row(UTCDate("2017-12-12"))
+ row(localDate("2017-12-12")),
+ row(localDate("2017-12-12"))
))
}
@@ -1039,7 +1076,7 @@ class CalcITCase extends BatchTestBase {
" DATE_FORMAT('2015-05-20 10:00:00.887', 'yyyy-MM-dd HH:mm:ss', 'yyyy/MM/dd HH:mm:ss')" +
" FROM testTable WHERE a = TRUE",
Seq(
- row(UTCTimestamp("2015-05-20 10:00:00.887"),
+ row(localDateTime("2015-05-20 10:00:00.887"),
"2015/05/20 10:00:00",
"2015/05/20 10:00:00",
"2015/05/20 10:00:00")
@@ -1049,61 +1086,61 @@ class CalcITCase extends BatchTestBase {
@Test
def testYear(): Unit = {
checkResult("SELECT j, YEAR(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "2015")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "2015")))
}
@Test
def testQuarter(): Unit = {
checkResult("SELECT j, QUARTER(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "2")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "2")))
}
@Test
def testMonth(): Unit = {
checkResult("SELECT j, MONTH(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "5")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "5")))
}
@Test
def testWeek(): Unit = {
checkResult("SELECT j, WEEK(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "21")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "21")))
}
@Test
def testDayOfYear(): Unit = {
checkResult("SELECT j, DAYOFYEAR(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "140")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "140")))
}
@Test
def testDayOfMonth(): Unit = {
checkResult("SELECT j, DAYOFMONTH(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "20")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "20")))
}
@Test
def testDayOfWeek(): Unit = {
checkResult("SELECT j, DAYOFWEEK(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "4")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "4")))
}
@Test
def testHour(): Unit = {
checkResult("SELECT j, HOUR(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "10")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "10")))
}
@Test
def testMinute(): Unit = {
checkResult("SELECT j, MINUTE(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "0")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "0")))
}
@Test
def testSecond(): Unit = {
checkResult("SELECT j, SECOND(j) FROM testTable WHERE a = TRUE",
- Seq(row(UTCTimestamp("2015-05-20 10:00:00.887"), "0")))
+ Seq(row(localDateTime("2015-05-20 10:00:00.887"), "0")))
}
@Test
@@ -1158,7 +1195,7 @@ class CalcITCase extends BatchTestBase {
" TO_DATE(CAST(null AS VARCHAR))," +
" TO_DATE('2016-12-31')," +
" TO_DATE('2016-12-31', 'yyyy-MM-dd')",
- Seq(row(null, UTCDate("2016-12-31"), UTCDate("2016-12-31"))))
+ Seq(row(null, localDate("2016-12-31"), localDate("2016-12-31"))))
}
@Test
@@ -1167,7 +1204,7 @@ class CalcITCase extends BatchTestBase {
" TO_TIMESTAMP(CAST(null AS VARCHAR))," +
" TO_TIMESTAMP('2016-12-31 00:12:00')," +
" TO_TIMESTAMP('2016-12-31', 'yyyy-MM-dd')",
- Seq(row(null, UTCTimestamp("2016-12-31 00:12:00"), UTCTimestamp("2016-12-31 00:00:00"))))
+ Seq(row(null, localDateTime("2016-12-31 00:12:00"), localDateTime("2016-12-31 00:00:00"))))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
index 43016d4..c48b0bb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.batch.sql
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, RowTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
@@ -147,8 +147,9 @@ class CorrelateITCase extends BatchTestBase {
@Test
def testLongAndTemporalTypes(): Unit = {
registerCollection("myT", Seq(
- row(UTCDate("1990-10-14"), 1000L, UTCTimestamp("1990-10-14 12:10:10"))),
- new RowTypeInfo(SqlTimeTypeInfo.DATE, LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP),
+ row(localDate("1990-10-14"), 1000L, localDateTime("1990-10-14 12:10:10"))),
+ new RowTypeInfo(LocalTimeTypeInfo.LOCAL_DATE,
+ LONG_TYPE_INFO, LocalTimeTypeInfo.LOCAL_DATE_TIME),
"x, y, z")
registerFunction("func", new JavaTableFunc0)
checkResult(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
index 2ccffc0..342f82b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
@@ -1046,40 +1046,40 @@ class OverWindowITCase extends BatchTestBase {
"FOLLOWING) FROM Table6",
Seq(
// a b c d e f
- row(1, 1.1, "a", UTCDate("2017-04-08"), UTCTime("12:00:59"),
- UTCTimestamp("2015-05-20 10:00:00"), 1, 1, 1, 1.1),
-
- row(2, 2.5, "abc", UTCDate("2017-04-09"), UTCTime("12:00:59"),
- UTCTimestamp("2019-09-19 08:03:09"), 2, 1, 2, 2.5),
- row(2, -2.4, "abcd", UTCDate("2017-04-08"), UTCTime("00:00:00"),
- UTCTimestamp("2016-09-01 23:07:06"), 1, 2, 1, 2.5),
-
- row(3, -9.77, "ABC", UTCDate("2016-08-08"), UTCTime("04:15:00"),
- UTCTimestamp("1999-12-12 10:00:02"), 1, 2, 2, -9.77),
- row(3, 0.08, "BCD", UTCDate("2017-04-10"), UTCTime("02:30:00"),
- UTCTimestamp("1999-12-12 10:03:00"), 2, 3, 3, 0.08),
- row(3, 0.0, "abc?", UTCDate("2017-10-11"), UTCTime("23:59:59"),
- UTCTimestamp("1999-12-12 10:00:00"), 3, 1, 1, 0.08),
-
- row(4, 3.14, "CDE", UTCDate("2017-11-11"), UTCTime("02:30:00"),
- UTCTimestamp("2017-11-20 09:00:00"), 4, 4, 4, 3.14),
- row(4, 3.15, "DEF", UTCDate("2017-02-06"), UTCTime("06:00:00"),
- UTCTimestamp("2015-11-19 10:00:00"), 1, 3, 1, 3.15),
- row(4, 3.14, "EFG", UTCDate("2017-05-20"), UTCTime("09:46:18"),
- UTCTimestamp("2015-11-19 10:00:01"), 3, 2, 2, 3.15),
- row(4, 3.16, "FGH", UTCDate("2017-05-19"), UTCTime("11:11:11"),
- UTCTimestamp("2015-11-20 08:59:59"), 2, 1, 3, 3.16),
-
- row(5, -5.9, "GHI", UTCDate("2017-07-20"), UTCTime("22:22:22"),
- UTCTimestamp("1989-06-04 10:00:00.78"), 3, 1, 2, -5.9),
- row(5, 2.71, "HIJ", UTCDate("2017-09-08"), UTCTime("20:09:09"),
- UTCTimestamp("1997-07-01 09:00:00.99"), 4, 2, 3, 2.71),
- row(5, 3.9, "IJK", UTCDate("2017-02-02"), UTCTime("03:03:03"),
- UTCTimestamp("2000-01-01 00:00:00.09"), 1, 5, 4, 3.9),
- row(5, 0.7, "JKL", UTCDate("2017-10-01"), UTCTime("19:00:00"),
- UTCTimestamp("2010-06-01 10:00:00.999"), 5, 3, 5, 3.9),
- row(5, -2.8, "KLM", UTCDate("2017-07-01"), UTCTime("12:00:59"),
- UTCTimestamp("1937-07-07 08:08:08.888"), 2, 4, 1, 3.9)
+ row(1, 1.1, "a", localDate("2017-04-08"), localTime("12:00:59"),
+ localDateTime("2015-05-20 10:00:00"), 1, 1, 1, 1.1),
+
+ row(2, 2.5, "abc", localDate("2017-04-09"), localTime("12:00:59"),
+ localDateTime("2019-09-19 08:03:09"), 2, 1, 2, 2.5),
+ row(2, -2.4, "abcd", localDate("2017-04-08"), localTime("00:00:00"),
+ localDateTime("2016-09-01 23:07:06"), 1, 2, 1, 2.5),
+
+ row(3, -9.77, "ABC", localDate("2016-08-08"), localTime("04:15:00"),
+ localDateTime("1999-12-12 10:00:02"), 1, 2, 2, -9.77),
+ row(3, 0.08, "BCD", localDate("2017-04-10"), localTime("02:30:00"),
+ localDateTime("1999-12-12 10:03:00"), 2, 3, 3, 0.08),
+ row(3, 0.0, "abc?", localDate("2017-10-11"), localTime("23:59:59"),
+ localDateTime("1999-12-12 10:00:00"), 3, 1, 1, 0.08),
+
+ row(4, 3.14, "CDE", localDate("2017-11-11"), localTime("02:30:00"),
+ localDateTime("2017-11-20 09:00:00"), 4, 4, 4, 3.14),
+ row(4, 3.15, "DEF", localDate("2017-02-06"), localTime("06:00:00"),
+ localDateTime("2015-11-19 10:00:00"), 1, 3, 1, 3.15),
+ row(4, 3.14, "EFG", localDate("2017-05-20"), localTime("09:46:18"),
+ localDateTime("2015-11-19 10:00:01"), 3, 2, 2, 3.15),
+ row(4, 3.16, "FGH", localDate("2017-05-19"), localTime("11:11:11"),
+ localDateTime("2015-11-20 08:59:59"), 2, 1, 3, 3.16),
+
+ row(5, -5.9, "GHI", localDate("2017-07-20"), localTime("22:22:22"),
+ localDateTime("1989-06-04 10:00:00.78"), 3, 1, 2, -5.9),
+ row(5, 2.71, "HIJ", localDate("2017-09-08"), localTime("20:09:09"),
+ localDateTime("1997-07-01 09:00:00.99"), 4, 2, 3, 2.71),
+ row(5, 3.9, "IJK", localDate("2017-02-02"), localTime("03:03:03"),
+ localDateTime("2000-01-01 00:00:00.09"), 1, 5, 4, 3.9),
+ row(5, 0.7, "JKL", localDate("2017-10-01"), localTime("19:00:00"),
+ localDateTime("2010-06-01 10:00:00.999"), 5, 3, 5, 3.9),
+ row(5, -2.8, "KLM", localDate("2017-07-01"), localTime("12:00:59"),
+ localDateTime("1937-07-07 08:08:08.888"), 2, 4, 1, 3.9)
)
)
}
@@ -2030,21 +2030,21 @@ class OverWindowITCase extends BatchTestBase {
"SELECT a,d, count(*) over (partition by a order by d RANGE between INTERVAL '0' DAY " +
"FOLLOWING and INTERVAL '2' DAY FOLLOWING) FROM Table6",
Seq(
- row(1, UTCDate("2017-04-08"), 1),
- row(2, UTCDate("2017-04-08"), 2),
- row(2, UTCDate("2017-04-09"), 1),
- row(3, UTCDate("2016-08-08"), 1),
- row(3, UTCDate("2017-04-10"), 1),
- row(3, UTCDate("2017-10-11"), 1),
- row(4, UTCDate("2017-02-06"), 1),
- row(4, UTCDate("2017-05-19"), 2),
- row(4, UTCDate("2017-05-20"), 1),
- row(4, UTCDate("2017-11-11"), 1),
- row(5, UTCDate("2017-02-02"), 1),
- row(5, UTCDate("2017-07-01"), 1),
- row(5, UTCDate("2017-07-20"), 1),
- row(5, UTCDate("2017-09-08"), 1),
- row(5, UTCDate("2017-10-01"), 1)
+ row(1, localDate("2017-04-08"), 1),
+ row(2, localDate("2017-04-08"), 2),
+ row(2, localDate("2017-04-09"), 1),
+ row(3, localDate("2016-08-08"), 1),
+ row(3, localDate("2017-04-10"), 1),
+ row(3, localDate("2017-10-11"), 1),
+ row(4, localDate("2017-02-06"), 1),
+ row(4, localDate("2017-05-19"), 2),
+ row(4, localDate("2017-05-20"), 1),
+ row(4, localDate("2017-11-11"), 1),
+ row(5, localDate("2017-02-02"), 1),
+ row(5, localDate("2017-07-01"), 1),
+ row(5, localDate("2017-07-20"), 1),
+ row(5, localDate("2017-09-08"), 1),
+ row(5, localDate("2017-10-01"), 1)
)
)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
index eab3a22..ecc77a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -18,20 +18,22 @@
package org.apache.flink.table.runtime.batch.sql
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.table.runtime.utils.BatchTestBase
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.util.TestTableSourceWithTime
import org.apache.flink.types.Row
+
import org.junit.Test
import java.lang.{Integer => JInt, Long => JLong}
-import java.sql.Timestamp
import scala.collection.JavaConversions._
@@ -75,7 +77,7 @@ class TableScanITCase extends BatchTestBase {
def testProctimeTableSource(): Unit = {
val tableName = "MyTable"
val data = Seq("Mary", "Peter", "Bob", "Liz")
- val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+ val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.LOCAL_DATE_TIME))
val returnType = Types.STRING
val tableSource = new TestTableSourceWithTime(true, schema, returnType, data, null, "ptime")
@@ -95,15 +97,15 @@ class TableScanITCase extends BatchTestBase {
def testRowtimeTableSource(): Unit = {
val tableName = "MyTable"
val data = Seq(
- row("Mary", new Timestamp(1L), new JInt(10)),
- row("Bob", new Timestamp(2L), new JInt(20)),
- row("Mary", new Timestamp(2L), new JInt(30)),
- row("Liz", new Timestamp(2001L), new JInt(40)))
+ row("Mary", unixTimestampToLocalDateTime(1L), new JInt(10)),
+ row("Bob", unixTimestampToLocalDateTime(2L), new JInt(20)),
+ row("Mary", unixTimestampToLocalDateTime(2L), new JInt(30)),
+ row("Liz", unixTimestampToLocalDateTime(2001L), new JInt(40)))
val fieldNames = Array("name", "rtime", "amount")
- val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+ val schema = new TableSchema(fieldNames, Array(Types.STRING, LOCAL_DATE_TIME, Types.INT))
val rowType = new RowTypeInfo(
- Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ Array(Types.STRING, LOCAL_DATE_TIME, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
val tableSource = new TestTableSourceWithTime(true, schema, rowType, data, "rtime", null)
@@ -112,10 +114,10 @@ class TableScanITCase extends BatchTestBase {
checkResult(
s"SELECT * FROM $tableName",
Seq(
- row("Mary", new Timestamp(1L), new JInt(10)),
- row("Mary", new Timestamp(2L), new JInt(30)),
- row("Bob", new Timestamp(2L), new JInt(20)),
- row("Liz", new Timestamp(2001L), new JInt(40)))
+ row("Mary", unixTimestampToLocalDateTime(1L), new JInt(10)),
+ row("Mary", unixTimestampToLocalDateTime(2L), new JInt(30)),
+ row("Bob", unixTimestampToLocalDateTime(2L), new JInt(20)),
+ row("Liz", unixTimestampToLocalDateTime(2001L), new JInt(40)))
)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnnestITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnnestITCase.scala
index 70da594..04a579d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnnestITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnnestITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.batch.sql
import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.types.Row
@@ -29,7 +30,6 @@ import org.junit.Test
import java.sql.Timestamp
import scala.collection.Seq
-
import scala.collection.JavaConverters._
class UnnestITCase extends BatchTestBase {
@@ -153,10 +153,10 @@ class UnnestITCase extends BatchTestBase {
@Test
def testTumbleWindowAggregateWithCollectUnnest(): Unit = {
val data = TestData.tupleData3.map {
- case (i, l, s) => row(i, l, s, new Timestamp(i * 1000))
+ case (i, l, s) => row(i, l, s, unixTimestampToLocalDateTime(i * 1000))
}
registerCollection("T", data,
- new RowTypeInfo(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP),
+ new RowTypeInfo(Types.INT, Types.LONG, Types.STRING, Types.LOCAL_DATE_TIME),
"a, b, c, ts")
checkResult(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
index 64a3545..bd4c89b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
@@ -20,13 +20,15 @@ package org.apache.flink.table.runtime.batch.sql.agg
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{OptimizerConfigOptions, ExecutionConfigOptions, Types}
import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.table.runtime.utils.BatchTestBase
import org.apache.flink.table.runtime.utils.BatchTestBase.row
-import org.apache.flink.table.util.DateTimeTestUtil.UTCTimestamp
+import org.apache.flink.table.util.DateTimeTestUtil.localDateTime
import org.junit.{Before, Test}
import java.sql.Date
+import java.time.LocalDateTime
import scala.collection.JavaConverters._
import scala.collection.Seq
@@ -73,11 +75,11 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
)
registerCollection("T4",
- Seq(row(1, 1, "A", UTCTimestamp("2018-06-01 10:05:30"), "Hi"),
- row(2, 1, "B", UTCTimestamp("2018-06-01 10:10:10"), "Hello"),
- row(3, 2, "B", UTCTimestamp("2018-06-01 10:15:25"), "Hello world"),
- row(4, 3, "C", UTCTimestamp("2018-06-01 10:36:49"), "I am fine.")),
- new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.SQL_TIMESTAMP, Types.STRING),
+ Seq(row(1, 1, "A", localDateTime("2018-06-01 10:05:30"), "Hi"),
+ row(2, 1, "B", localDateTime("2018-06-01 10:10:10"), "Hello"),
+ row(3, 2, "B", localDateTime("2018-06-01 10:15:25"), "Hello world"),
+ row(4, 3, "C", localDateTime("2018-06-01 10:36:49"), "I am fine.")),
+ new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.LOCAL_DATE_TIME, Types.STRING),
"a4, b4, c4, d4, e4",
Array(true, true, true, true, true),
FlinkStatistic.builder().uniqueKeys(Set(Set("a4").asJava).asJava).build()
@@ -101,8 +103,9 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
registerCollection("T6",
(0 until 50000).map(
i => row(i, 1L, if (i % 500 == 0) null else s"Hello$i", "Hello world", 10,
- new Date(i + 1531820000000L))),
- new RowTypeInfo(Types.INT, Types.LONG, Types.STRING, Types.STRING, Types.INT, Types.SQL_DATE),
+ unixTimestampToLocalDateTime(i + 1531820000000L).toLocalDate)),
+ new RowTypeInfo(Types.INT, Types.LONG, Types.STRING,
+ Types.STRING, Types.INT, Types.LOCAL_DATE),
"a6, b6, c6, d6, e6, f6",
Array(true, true, true, true, true, true),
FlinkStatistic.builder().uniqueKeys(Set(Set("a6").asJava).asJava).build()
@@ -285,30 +288,30 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
"TUMBLE_START(d4, INTERVAL '15' MINUTE) AS s, " +
"TUMBLE_END(d4, INTERVAL '15' MINUTE) AS e FROM T4 " +
"GROUP BY a4, e4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, e4, s",
- Seq(row(1, "Hi", UTCTimestamp("2018-06-01 10:00:00.0"), 1D, 1),
- row(2, "Hello", UTCTimestamp("2018-06-01 10:00:00.0"), 1D, 1),
- row(3, "Hello world", UTCTimestamp("2018-06-01 10:15:00.0"), 2D, 1),
- row(4, "I am fine.", UTCTimestamp("2018-06-01 10:30:00.0"), 3D, 1)))
+ Seq(row(1, "Hi", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1D, 1),
+ row(2, "Hello", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1D, 1),
+ row(3, "Hello world", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 2D, 1),
+ row(4, "I am fine.", LocalDateTime.of(2018, 6, 1, 10, 30, 0), 3D, 1)))
checkResult("SELECT a4, c4, s, COUNT(b4) FROM " +
"(SELECT a4, c4, avg(b4) AS b4, " +
"TUMBLE_START(d4, INTERVAL '15' MINUTE) AS s, " +
"TUMBLE_END(d4, INTERVAL '15' MINUTE) AS e FROM T4 " +
"GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, c4, s",
- Seq(row(1, "A", UTCTimestamp("2018-06-01 10:00:00.0"), 1),
- row(2, "B", UTCTimestamp("2018-06-01 10:00:00.0"), 1),
- row(3, "B", UTCTimestamp("2018-06-01 10:15:00.0"), 1),
- row(4, "C", UTCTimestamp("2018-06-01 10:30:00.0"), 1)))
+ Seq(row(1, "A", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1),
+ row(2, "B", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1),
+ row(3, "B", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 1),
+ row(4, "C", LocalDateTime.of(2018, 6, 1, 10, 30, 0), 1)))
checkResult("SELECT a4, c4, e, COUNT(b4) FROM " +
"(SELECT a4, c4, VAR_POP(b4) AS b4, " +
"TUMBLE_START(d4, INTERVAL '15' MINUTE) AS s, " +
"TUMBLE_END(d4, INTERVAL '15' MINUTE) AS e FROM T4 " +
"GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, c4, e",
- Seq(row(1, "A", UTCTimestamp("2018-06-01 10:15:00.0"), 1),
- row(2, "B", UTCTimestamp("2018-06-01 10:15:00.0"), 1),
- row(3, "B", UTCTimestamp("2018-06-01 10:30:00.0"), 1),
- row(4, "C", UTCTimestamp("2018-06-01 10:45:00.0"), 1)))
+ Seq(row(1, "A", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 1),
+ row(2, "B", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 1),
+ row(3, "B", LocalDateTime.of(2018, 6, 1, 10, 30, 0), 1),
+ row(4, "C", LocalDateTime.of(2018, 6, 1, 10, 45, 0), 1)))
checkResult("SELECT a4, b4, c4, COUNT(*) FROM " +
"(SELECT a4, c4, SUM(b4) AS b4, " +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
index 906b1d9..e7b8913c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
@@ -36,17 +36,17 @@ class GroupingSetsITCase extends BatchTestBase {
private val TABLE_NAME_EMPS = "emps"
private val empsTypes = new RowTypeInfo(Types.LONG, Types.STRING, Types.INT, Types.STRING,
- Types.STRING, Types.LONG, Types.INT, Types.BOOLEAN, Types.BOOLEAN, Types.SQL_DATE)
+ Types.STRING, Types.LONG, Types.INT, Types.BOOLEAN, Types.BOOLEAN, Types.LOCAL_DATE)
private val empsNames =
"empno, name, deptno, gender, city, empid, age, slacker, manager, joinedat"
private val nullableOfEmps: Array[Boolean] =
Array(false, false, false, true, true, false, true, true, false, false)
private lazy val empsData = Seq(
- row(100L, "Fred", 10, null, null, 40L, 25, true, false, UTCDate("1996-08-03")),
- row(110L, "Eric", 20, "M", "San Francisco", 3L, 80, null, false, UTCDate("2001-01-01")),
- row(110L, "John", 40, "M", "Vancouver", 2L, null, false, true, UTCDate("2002-05-03")),
- row(120L, "Wilma", 20, "F", null, 1L, 5, null, true, UTCDate("2005-09-07")),
- row(130L, "Alice", 40, "F", "Vancouver", 2L, null, false, true, UTCDate("2007-01-01"))
+ row(100L, "Fred", 10, null, null, 40L, 25, true, false, localDate("1996-08-03")),
+ row(110L, "Eric", 20, "M", "San Francisco", 3L, 80, null, false, localDate("2001-01-01")),
+ row(110L, "John", 40, "M", "Vancouver", 2L, null, false, true, localDate("2002-05-03")),
+ row(120L, "Wilma", 20, "F", null, 1L, 5, null, true, localDate("2005-09-07")),
+ row(130L, "Alice", 40, "F", "Vancouver", 2L, null, false, true, localDate("2007-01-01"))
)
private val TABLE_NAME_EMP = "emp"
@@ -78,24 +78,24 @@ class GroupingSetsITCase extends BatchTestBase {
private val TABLE_NAME_SCOTT_EMP = "scott_emp"
private val scottEmpTypes = new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.INT,
- Types.SQL_DATE, Types.DOUBLE, Types.DOUBLE, Types.INT)
+ Types.LOCAL_DATE, Types.DOUBLE, Types.DOUBLE, Types.INT)
private val scottEmpNames = "empno, ename, job, mgr, hiredate, sal, comm, deptno"
private val nullableOfScottEmp = Array(false, false, false, true, false, false, true, false)
private lazy val scottEmpData = Seq(
- row(7369, "SMITH", "CLERK", 7902, UTCDate("1980-12-17"), 800.00, null, 20),
- row(7499, "ALLEN", "SALESMAN", 7698, UTCDate("1981-02-20"), 1600.00, 300.00, 30),
- row(7521, "WARD", "SALESMAN", 7698, UTCDate("1981-02-22"), 1250.00, 500.00, 30),
- row(7566, "JONES", "MANAGER", 7839, UTCDate("1981-02-04"), 2975.00, null, 20),
- row(7654, "MARTIN", "SALESMAN", 7698, UTCDate("1981-09-28"), 1250.00, 1400.00, 30),
- row(7698, "BLAKE", "MANAGER", 7839, UTCDate("1981-01-05"), 2850.00, null, 30),
- row(7782, "CLARK", "MANAGER", 7839, UTCDate("1981-06-09"), 2450.00, null, 10),
- row(7788, "SCOTT", "ANALYST", 7566, UTCDate("1987-04-19"), 3000.00, null, 20),
- row(7839, "KING", "PRESIDENT", null, UTCDate("1981-11-17"), 5000.00, null, 10),
- row(7844, "TURNER", "SALESMAN", 7698, UTCDate("1981-09-08"), 1500.00, 0.00, 30),
- row(7876, "ADAMS", "CLERK", 7788, UTCDate("1987-05-23"), 1100.00, null, 20),
- row(7900, "JAMES", "CLERK", 7698, UTCDate("1981-12-03"), 950.00, null, 30),
- row(7902, "FORD", "ANALYST", 7566, UTCDate("1981-12-03"), 3000.00, null, 20),
- row(7934, "MILLER", "CLERK", 7782, UTCDate("1982-01-23"), 1300.00, null, 10)
+ row(7369, "SMITH", "CLERK", 7902, localDate("1980-12-17"), 800.00, null, 20),
+ row(7499, "ALLEN", "SALESMAN", 7698, localDate("1981-02-20"), 1600.00, 300.00, 30),
+ row(7521, "WARD", "SALESMAN", 7698, localDate("1981-02-22"), 1250.00, 500.00, 30),
+ row(7566, "JONES", "MANAGER", 7839, localDate("1981-02-04"), 2975.00, null, 20),
+ row(7654, "MARTIN", "SALESMAN", 7698, localDate("1981-09-28"), 1250.00, 1400.00, 30),
+ row(7698, "BLAKE", "MANAGER", 7839, localDate("1981-01-05"), 2850.00, null, 30),
+ row(7782, "CLARK", "MANAGER", 7839, localDate("1981-06-09"), 2450.00, null, 10),
+ row(7788, "SCOTT", "ANALYST", 7566, localDate("1987-04-19"), 3000.00, null, 20),
+ row(7839, "KING", "PRESIDENT", null, localDate("1981-11-17"), 5000.00, null, 10),
+ row(7844, "TURNER", "SALESMAN", 7698, localDate("1981-09-08"), 1500.00, 0.00, 30),
+ row(7876, "ADAMS", "CLERK", 7788, localDate("1987-05-23"), 1100.00, null, 20),
+ row(7900, "JAMES", "CLERK", 7698, localDate("1981-12-03"), 950.00, null, 30),
+ row(7902, "FORD", "ANALYST", 7566, localDate("1981-12-03"), 3000.00, null, 20),
+ row(7934, "MILLER", "CLERK", 7782, localDate("1982-01-23"), 1300.00, null, 10)
)
@Before
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
index 36127dd..131fe3d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.batch.sql.agg
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -33,7 +33,7 @@ import org.apache.flink.table.runtime.utils.BatchTestBase
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.runtime.utils.TestData._
import org.apache.flink.table.sources.InputFormatTableSource
-import org.apache.flink.table.util.DateTimeTestUtil.UTCTimestamp
+import org.apache.flink.table.util.DateTimeTestUtil.localDateTime
import org.apache.flink.table.util.{CountAggFunction, IntAvgAggFunction, IntSumAggFunction}
import org.apache.flink.types.Row
@@ -63,27 +63,27 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY a, TUMBLE(ts, INTERVAL '3' SECOND)",
Seq(
- row(1, 1, "1970-01-01 00:00:00.0"),
- row(2, 1, "1970-01-01 00:00:00.0"),
- row(3, 1, "1970-01-01 00:00:03.0"),
- row(4, 1, "1970-01-01 00:00:03.0"),
- row(5, 1, "1970-01-01 00:00:03.0"),
- row(6, 1, "1970-01-01 00:00:06.0"),
- row(7, 1, "1970-01-01 00:00:06.0"),
- row(8, 1, "1970-01-01 00:00:06.0"),
- row(9, 1, "1970-01-01 00:00:09.0"),
- row(10, 1, "1970-01-01 00:00:09.0"),
- row(11, 1, "1970-01-01 00:00:09.0"),
- row(12, 1, "1970-01-01 00:00:12.0"),
- row(13, 1, "1970-01-01 00:00:12.0"),
- row(14, 1, "1970-01-01 00:00:12.0"),
- row(15, 1, "1970-01-01 00:00:15.0"),
- row(16, 1, "1970-01-01 00:00:15.0"),
- row(17, 1, "1970-01-01 00:00:15.0"),
- row(18, 1, "1970-01-01 00:00:18.0"),
- row(19, 1, "1970-01-01 00:00:18.0"),
- row(20, 1, "1970-01-01 00:00:18.0"),
- row(21, 1, "1970-01-01 00:00:21.0")
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(2, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(3, 1, localDateTime("1970-01-01 00:00:03.0")),
+ row(4, 1, localDateTime("1970-01-01 00:00:03.0")),
+ row(5, 1, localDateTime("1970-01-01 00:00:03.0")),
+ row(6, 1, localDateTime("1970-01-01 00:00:06.0")),
+ row(7, 1, localDateTime("1970-01-01 00:00:06.0")),
+ row(8, 1, localDateTime("1970-01-01 00:00:06.0")),
+ row(9, 1, localDateTime("1970-01-01 00:00:09.0")),
+ row(10, 1, localDateTime("1970-01-01 00:00:09.0")),
+ row(11, 1, localDateTime("1970-01-01 00:00:09.0")),
+ row(12, 1, localDateTime("1970-01-01 00:00:12.0")),
+ row(13, 1, localDateTime("1970-01-01 00:00:12.0")),
+ row(14, 1, localDateTime("1970-01-01 00:00:12.0")),
+ row(15, 1, localDateTime("1970-01-01 00:00:15.0")),
+ row(16, 1, localDateTime("1970-01-01 00:00:15.0")),
+ row(17, 1, localDateTime("1970-01-01 00:00:15.0")),
+ row(18, 1, localDateTime("1970-01-01 00:00:18.0")),
+ row(19, 1, localDateTime("1970-01-01 00:00:18.0")),
+ row(20, 1, localDateTime("1970-01-01 00:00:18.0")),
+ row(21, 1, localDateTime("1970-01-01 00:00:21.0"))
)
)
@@ -93,27 +93,27 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY a, TUMBLE(ts, INTERVAL '3' SECOND), b",
Seq(
- row(1, 1, "1970-01-01 00:00:00.0", 1),
- row(2, 1, "1970-01-01 00:00:00.0", 2),
- row(3, 1, "1970-01-01 00:00:03.0", 2),
- row(4, 1, "1970-01-01 00:00:03.0", 3),
- row(5, 1, "1970-01-01 00:00:03.0", 3),
- row(6, 1, "1970-01-01 00:00:06.0", 3),
- row(7, 1, "1970-01-01 00:00:06.0", 4),
- row(8, 1, "1970-01-01 00:00:06.0", 4),
- row(9, 1, "1970-01-01 00:00:09.0", 4),
- row(10, 1, "1970-01-01 00:00:09.0", 4),
- row(11, 1, "1970-01-01 00:00:09.0", 5),
- row(12, 1, "1970-01-01 00:00:12.0", 5),
- row(13, 1, "1970-01-01 00:00:12.0", 5),
- row(14, 1, "1970-01-01 00:00:12.0", 5),
- row(15, 1, "1970-01-01 00:00:15.0", 5),
- row(16, 1, "1970-01-01 00:00:15.0", 6),
- row(17, 1, "1970-01-01 00:00:15.0", 6),
- row(18, 1, "1970-01-01 00:00:18.0", 6),
- row(19, 1, "1970-01-01 00:00:18.0", 6),
- row(20, 1, "1970-01-01 00:00:18.0", 6),
- row(21, 1, "1970-01-01 00:00:21.0", 6)
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0"), 1),
+ row(2, 1, localDateTime("1970-01-01 00:00:00.0"), 2),
+ row(3, 1, localDateTime("1970-01-01 00:00:03.0"), 2),
+ row(4, 1, localDateTime("1970-01-01 00:00:03.0"), 3),
+ row(5, 1, localDateTime("1970-01-01 00:00:03.0"), 3),
+ row(6, 1, localDateTime("1970-01-01 00:00:06.0"), 3),
+ row(7, 1, localDateTime("1970-01-01 00:00:06.0"), 4),
+ row(8, 1, localDateTime("1970-01-01 00:00:06.0"), 4),
+ row(9, 1, localDateTime("1970-01-01 00:00:09.0"), 4),
+ row(10, 1, localDateTime("1970-01-01 00:00:09.0"), 4),
+ row(11, 1, localDateTime("1970-01-01 00:00:09.0"), 5),
+ row(12, 1, localDateTime("1970-01-01 00:00:12.0"), 5),
+ row(13, 1, localDateTime("1970-01-01 00:00:12.0"), 5),
+ row(14, 1, localDateTime("1970-01-01 00:00:12.0"), 5),
+ row(15, 1, localDateTime("1970-01-01 00:00:15.0"), 5),
+ row(16, 1, localDateTime("1970-01-01 00:00:15.0"), 6),
+ row(17, 1, localDateTime("1970-01-01 00:00:15.0"), 6),
+ row(18, 1, localDateTime("1970-01-01 00:00:18.0"), 6),
+ row(19, 1, localDateTime("1970-01-01 00:00:18.0"), 6),
+ row(20, 1, localDateTime("1970-01-01 00:00:18.0"), 6),
+ row(21, 1, localDateTime("1970-01-01 00:00:21.0"), 6)
)
)
@@ -123,19 +123,19 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table6 " +
"GROUP BY a, TUMBLE(f, INTERVAL '10' SECOND)",
Seq(
- row(1, 1.1, 1.1, "2015-05-20 10:00:00.0"),
- row(2, -2.4, -2.4, "2016-09-01 23:07:00.0"),
- row(2, 2.5, 2.5, "2019-09-19 08:03:00.0"),
- row(3, -4.885, -9.77, "1999-12-12 10:00:00.0"),
- row(3, 0.08, 0.08, "1999-12-12 10:03:00.0"),
- row(4, 3.14, 3.14, "2017-11-20 09:00:00.0"),
- row(4, 3.145, 3.14, "2015-11-19 10:00:00.0"),
- row(4, 3.16, 3.16, "2015-11-20 08:59:50.0"),
- row(5, -5.9, -5.9, "1989-06-04 10:00:00.0"),
- row(5, -2.8, -2.8, "1937-07-07 08:08:00.0"),
- row(5, 0.7, 0.7, "2010-06-01 10:00:00.0"),
- row(5, 2.71, 2.71, "1997-07-01 09:00:00.0"),
- row(5, 3.9, 3.9, "2000-01-01 00:00:00.0")
+ row(1, 1.1, 1.1, localDateTime("2015-05-20 10:00:00.0")),
+ row(2, -2.4, -2.4, localDateTime("2016-09-01 23:07:00.0")),
+ row(2, 2.5, 2.5, localDateTime("2019-09-19 08:03:00.0")),
+ row(3, -4.885, -9.77, localDateTime("1999-12-12 10:00:00.0")),
+ row(3, 0.08, 0.08, localDateTime("1999-12-12 10:03:00.0")),
+ row(4, 3.14, 3.14, localDateTime("2017-11-20 09:00:00.0")),
+ row(4, 3.145, 3.14, localDateTime("2015-11-19 10:00:00.0")),
+ row(4, 3.16, 3.16, localDateTime("2015-11-20 08:59:50.0")),
+ row(5, -5.9, -5.9, localDateTime("1989-06-04 10:00:00.0")),
+ row(5, -2.8, -2.8, localDateTime("1937-07-07 08:08:00.0")),
+ row(5, 0.7, 0.7, localDateTime("2010-06-01 10:00:00.0")),
+ row(5, 2.71, 2.71, localDateTime("1997-07-01 09:00:00.0")),
+ row(5, 3.9, 3.9, localDateTime("2000-01-01 00:00:00.0"))
)
)
}
@@ -148,21 +148,21 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '9' SECOND)",
Seq(
- row(1, 1, "1969-12-31 23:59:55.0"),
- row(1, 1, "1970-01-01 00:00:00.0"),
- row(2, 5, "1969-12-31 23:59:55.0"),
- row(2, 5, "1970-01-01 00:00:00.0"),
- row(3, 11, "1970-01-01 00:00:05.0"),
- row(3, 15, "1970-01-01 00:00:00.0"),
- row(4, 10, "1970-01-01 00:00:10.0"),
- row(4, 15, "1970-01-01 00:00:00.0"),
- row(4, 34, "1970-01-01 00:00:05.0"),
- row(5, 15, "1970-01-01 00:00:15.0"),
- row(5, 36, "1970-01-01 00:00:05.0"),
- row(5, 65, "1970-01-01 00:00:10.0"),
- row(6, 111, "1970-01-01 00:00:15.0"),
- row(6, 41, "1970-01-01 00:00:20.0"),
- row(6, 51, "1970-01-01 00:00:10.0")
+ row(1, 1, localDateTime("1969-12-31 23:59:55.0")),
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(2, 5, localDateTime("1969-12-31 23:59:55.0")),
+ row(2, 5, localDateTime("1970-01-01 00:00:00.0")),
+ row(3, 11, localDateTime("1970-01-01 00:00:05.0")),
+ row(3, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 10, localDateTime("1970-01-01 00:00:10.0")),
+ row(4, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 34, localDateTime("1970-01-01 00:00:05.0")),
+ row(5, 15, localDateTime("1970-01-01 00:00:15.0")),
+ row(5, 36, localDateTime("1970-01-01 00:00:05.0")),
+ row(5, 65, localDateTime("1970-01-01 00:00:10.0")),
+ row(6, 111, localDateTime("1970-01-01 00:00:15.0")),
+ row(6, 41, localDateTime("1970-01-01 00:00:20.0")),
+ row(6, 51, localDateTime("1970-01-01 00:00:10.0"))
)
)
@@ -171,21 +171,21 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '9' SECOND)",
Seq(
- row(1, 1, "1969-12-31 23:59:55.0"),
- row(1, 1, "1970-01-01 00:00:00.0"),
- row(2, 5, "1969-12-31 23:59:55.0"),
- row(2, 5, "1970-01-01 00:00:00.0"),
- row(3, 11, "1970-01-01 00:00:05.0"),
- row(3, 15, "1970-01-01 00:00:00.0"),
- row(4, 10, "1970-01-01 00:00:10.0"),
- row(4, 15, "1970-01-01 00:00:00.0"),
- row(4, 34, "1970-01-01 00:00:05.0"),
- row(5, 15, "1970-01-01 00:00:15.0"),
- row(5, 36, "1970-01-01 00:00:05.0"),
- row(5, 65, "1970-01-01 00:00:10.0"),
- row(6, 111, "1970-01-01 00:00:15.0"),
- row(6, 41, "1970-01-01 00:00:20.0"),
- row(6, 51, "1970-01-01 00:00:10.0")
+ row(1, 1, localDateTime("1969-12-31 23:59:55.0")),
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(2, 5, localDateTime("1969-12-31 23:59:55.0")),
+ row(2, 5, localDateTime("1970-01-01 00:00:00.0")),
+ row(3, 11, localDateTime("1970-01-01 00:00:05.0")),
+ row(3, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 10, localDateTime("1970-01-01 00:00:10.0")),
+ row(4, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 34, localDateTime("1970-01-01 00:00:05.0")),
+ row(5, 15, localDateTime("1970-01-01 00:00:15.0")),
+ row(5, 36, localDateTime("1970-01-01 00:00:05.0")),
+ row(5, 65, localDateTime("1970-01-01 00:00:10.0")),
+ row(6, 111, localDateTime("1970-01-01 00:00:15.0")),
+ row(6, 41, localDateTime("1970-01-01 00:00:20.0")),
+ row(6, 51, localDateTime("1970-01-01 00:00:10.0"))
)
)
@@ -236,19 +236,19 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY b, HOP(ts, INTERVAL '5.111' SECOND(1,3), INTERVAL '9' SECOND)",
Seq(
- row(1, 1, "1969-12-31 23:59:54.889"),
- row(1, 1, "1970-01-01 00:00:00.0"),
- row(2, 5, "1969-12-31 23:59:54.889"),
- row(2, 5, "1970-01-01 00:00:00.0"),
- row(3, 6, "1970-01-01 00:00:05.111"),
- row(3, 15, "1970-01-01 00:00:00.0"),
- row(4, 15, "1970-01-01 00:00:00.0"),
- row(4, 34, "1970-01-01 00:00:05.111"),
- row(5, 50, "1970-01-01 00:00:05.111"),
- row(5, 65, "1970-01-01 00:00:10.222"),
- row(6, 111, "1970-01-01 00:00:15.333"),
- row(6, 21, "1970-01-01 00:00:20.444"),
- row(6, 70, "1970-01-01 00:00:10.222")
+ row(1, 1, localDateTime("1969-12-31 23:59:54.889")),
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(2, 5, localDateTime("1969-12-31 23:59:54.889")),
+ row(2, 5, localDateTime("1970-01-01 00:00:00.0")),
+ row(3, 6, localDateTime("1970-01-01 00:00:05.111")),
+ row(3, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 34, localDateTime("1970-01-01 00:00:05.111")),
+ row(5, 50, localDateTime("1970-01-01 00:00:05.111")),
+ row(5, 65, localDateTime("1970-01-01 00:00:10.222")),
+ row(6, 111, localDateTime("1970-01-01 00:00:15.333")),
+ row(6, 21, localDateTime("1970-01-01 00:00:20.444")),
+ row(6, 70, localDateTime("1970-01-01 00:00:10.222"))
)
)
@@ -257,19 +257,19 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY b, HOP(ts, INTERVAL '5.111' SECOND(1,3), INTERVAL '9' SECOND)",
Seq(
- row(1, 1, "1969-12-31 23:59:54.889"),
- row(1, 1, "1970-01-01 00:00:00.0"),
- row(2, 5, "1969-12-31 23:59:54.889"),
- row(2, 5, "1970-01-01 00:00:00.0"),
- row(3, 6, "1970-01-01 00:00:05.111"),
- row(3, 15, "1970-01-01 00:00:00.0"),
- row(4, 15, "1970-01-01 00:00:00.0"),
- row(4, 34, "1970-01-01 00:00:05.111"),
- row(5, 50, "1970-01-01 00:00:05.111"),
- row(5, 65, "1970-01-01 00:00:10.222"),
- row(6, 111, "1970-01-01 00:00:15.333"),
- row(6, 21, "1970-01-01 00:00:20.444"),
- row(6, 70, "1970-01-01 00:00:10.222")
+ row(1, 1, localDateTime("1969-12-31 23:59:54.889")),
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0")),
+ row(2, 5, localDateTime("1969-12-31 23:59:54.889")),
+ row(2, 5, localDateTime("1970-01-01 00:00:00.0")),
+ row(3, 6, localDateTime("1970-01-01 00:00:05.111")),
+ row(3, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 15, localDateTime("1970-01-01 00:00:00.0")),
+ row(4, 34, localDateTime("1970-01-01 00:00:05.111")),
+ row(5, 50, localDateTime("1970-01-01 00:00:05.111")),
+ row(5, 65, localDateTime("1970-01-01 00:00:10.222")),
+ row(6, 111, localDateTime("1970-01-01 00:00:15.333")),
+ row(6, 21, localDateTime("1970-01-01 00:00:20.444")),
+ row(6, 70, localDateTime("1970-01-01 00:00:10.222"))
)
)
@@ -281,14 +281,14 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY HOP(ts, INTERVAL '3' SECOND, INTERVAL '3' SECOND)",
Seq(
- row(12, "1970-01-01 00:00:03.0", "1970-01-01 00:00:06.0"),
- row(21, "1970-01-01 00:00:06.0", "1970-01-01 00:00:09.0"),
- row(21, "1970-01-01 00:00:21.0", "1970-01-01 00:00:24.0"),
- row(3, "1970-01-01 00:00:00.0", "1970-01-01 00:00:03.0"),
- row(30, "1970-01-01 00:00:09.0", "1970-01-01 00:00:12.0"),
- row(39, "1970-01-01 00:00:12.0", "1970-01-01 00:00:15.0"),
- row(48, "1970-01-01 00:00:15.0", "1970-01-01 00:00:18.0"),
- row(57, "1970-01-01 00:00:18.0", "1970-01-01 00:00:21.0")
+ row(12, localDateTime("1970-01-01 00:00:03.0"), localDateTime("1970-01-01 00:00:06.0")),
+ row(21, localDateTime("1970-01-01 00:00:06.0"), localDateTime("1970-01-01 00:00:09.0")),
+ row(21, localDateTime("1970-01-01 00:00:21.0"), localDateTime("1970-01-01 00:00:24.0")),
+ row(3, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:03.0")),
+ row(30, localDateTime("1970-01-01 00:00:09.0"), localDateTime("1970-01-01 00:00:12.0")),
+ row(39, localDateTime("1970-01-01 00:00:12.0"), localDateTime("1970-01-01 00:00:15.0")),
+ row(48, localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:18.0")),
+ row(57, localDateTime("1970-01-01 00:00:18.0"), localDateTime("1970-01-01 00:00:21.0"))
)
)
@@ -299,14 +299,14 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY HOP(ts, INTERVAL '3' SECOND, INTERVAL '3' SECOND)",
Seq(
- row(12, "1970-01-01 00:00:03.0", "1970-01-01 00:00:06.0"),
- row(21, "1970-01-01 00:00:06.0", "1970-01-01 00:00:09.0"),
- row(21, "1970-01-01 00:00:21.0", "1970-01-01 00:00:24.0"),
- row(3, "1970-01-01 00:00:00.0", "1970-01-01 00:00:03.0"),
- row(30, "1970-01-01 00:00:09.0", "1970-01-01 00:00:12.0"),
- row(39, "1970-01-01 00:00:12.0", "1970-01-01 00:00:15.0"),
- row(48, "1970-01-01 00:00:15.0", "1970-01-01 00:00:18.0"),
- row(57, "1970-01-01 00:00:18.0", "1970-01-01 00:00:21.0")
+ row(12, localDateTime("1970-01-01 00:00:03.0"), localDateTime("1970-01-01 00:00:06.0")),
+ row(21, localDateTime("1970-01-01 00:00:06.0"), localDateTime("1970-01-01 00:00:09.0")),
+ row(21, localDateTime("1970-01-01 00:00:21.0"), localDateTime("1970-01-01 00:00:24.0")),
+ row(3, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:03.0")),
+ row(30, localDateTime("1970-01-01 00:00:09.0"), localDateTime("1970-01-01 00:00:12.0")),
+ row(39, localDateTime("1970-01-01 00:00:12.0"), localDateTime("1970-01-01 00:00:15.0")),
+ row(48, localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:18.0")),
+ row(57, localDateTime("1970-01-01 00:00:18.0"), localDateTime("1970-01-01 00:00:21.0"))
)
)
@@ -316,17 +316,17 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY HOP(ts, INTERVAL '2' SECOND, INTERVAL '3' SECOND)",
Seq(
- row(1.5, 3, "1970-01-01 00:00:00.0"),
- row(11.0, 33, "1970-01-01 00:00:10.0"),
- row(13.0, 39, "1970-01-01 00:00:12.0"),
- row(15.0, 45, "1970-01-01 00:00:14.0"),
- row(17.0, 51, "1970-01-01 00:00:16.0"),
- row(19.0, 57, "1970-01-01 00:00:18.0"),
- row(20.5, 41, "1970-01-01 00:00:20.0"),
- row(3.0, 9, "1970-01-01 00:00:02.0"),
- row(5.0, 15, "1970-01-01 00:00:04.0"),
- row(7.0, 21, "1970-01-01 00:00:06.0"),
- row(9.0, 27, "1970-01-01 00:00:08.0")
+ row(1.5, 3, localDateTime("1970-01-01 00:00:00.0")),
+ row(11.0, 33, localDateTime("1970-01-01 00:00:10.0")),
+ row(13.0, 39, localDateTime("1970-01-01 00:00:12.0")),
+ row(15.0, 45, localDateTime("1970-01-01 00:00:14.0")),
+ row(17.0, 51, localDateTime("1970-01-01 00:00:16.0")),
+ row(19.0, 57, localDateTime("1970-01-01 00:00:18.0")),
+ row(20.5, 41, localDateTime("1970-01-01 00:00:20.0")),
+ row(3.0, 9, localDateTime("1970-01-01 00:00:02.0")),
+ row(5.0, 15, localDateTime("1970-01-01 00:00:04.0")),
+ row(7.0, 21, localDateTime("1970-01-01 00:00:06.0")),
+ row(9.0, 27, localDateTime("1970-01-01 00:00:08.0"))
)
)
@@ -335,28 +335,28 @@ class WindowAggregateITCase extends BatchTestBase {
"FROM Table3WithTimestamp " +
"GROUP BY HOP(ts, INTERVAL '2' SECOND, INTERVAL '3' SECOND)",
Seq(
- row(1.5, 3, "1970-01-01 00:00:00.0"),
- row(11.0, 33, "1970-01-01 00:00:10.0"),
- row(13.0, 39, "1970-01-01 00:00:12.0"),
- row(15.0, 45, "1970-01-01 00:00:14.0"),
- row(17.0, 51, "1970-01-01 00:00:16.0"),
- row(19.0, 57, "1970-01-01 00:00:18.0"),
- row(20.5, 41, "1970-01-01 00:00:20.0"),
- row(3.0, 9, "1970-01-01 00:00:02.0"),
- row(5.0, 15, "1970-01-01 00:00:04.0"),
- row(7.0, 21, "1970-01-01 00:00:06.0"),
- row(9.0, 27, "1970-01-01 00:00:08.0")
+ row(1.5, 3, localDateTime("1970-01-01 00:00:00.0")),
+ row(11.0, 33, localDateTime("1970-01-01 00:00:10.0")),
+ row(13.0, 39, localDateTime("1970-01-01 00:00:12.0")),
+ row(15.0, 45, localDateTime("1970-01-01 00:00:14.0")),
+ row(17.0, 51, localDateTime("1970-01-01 00:00:16.0")),
+ row(19.0, 57, localDateTime("1970-01-01 00:00:18.0")),
+ row(20.5, 41, localDateTime("1970-01-01 00:00:20.0")),
+ row(3.0, 9, localDateTime("1970-01-01 00:00:02.0")),
+ row(5.0, 15, localDateTime("1970-01-01 00:00:04.0")),
+ row(7.0, 21, localDateTime("1970-01-01 00:00:06.0")),
+ row(9.0, 27, localDateTime("1970-01-01 00:00:08.0"))
)
)
// millisecond precision sliding windows
val data = Seq(
- row(UTCTimestamp("2016-03-27 09:00:00.41"), 3),
- row(UTCTimestamp("2016-03-27 09:00:00.62"), 6),
- row(UTCTimestamp("2016-03-27 09:00:00.715"), 8)
+ row(localDateTime("2016-03-27 09:00:00.41"), 3),
+ row(localDateTime("2016-03-27 09:00:00.62"), 6),
+ row(localDateTime("2016-03-27 09:00:00.715"), 8)
)
registerCollection(
- "T2", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO),
+ "T2", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO),
"ts, v")
checkResult(
"""
@@ -367,18 +367,18 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T2
|GROUP BY HOP(ts, INTERVAL '0.04' SECOND(1,2), INTERVAL '0.2' SECOND(1,1))
""".stripMargin,
- Seq(row("2016-03-27 09:00:00.24", "2016-03-27 09:00:00.44", 1),
- row("2016-03-27 09:00:00.28", "2016-03-27 09:00:00.48", 1),
- row("2016-03-27 09:00:00.32", "2016-03-27 09:00:00.52", 1),
- row("2016-03-27 09:00:00.36", "2016-03-27 09:00:00.56", 1),
- row("2016-03-27 09:00:00.4", "2016-03-27 09:00:00.6", 1),
- row("2016-03-27 09:00:00.44", "2016-03-27 09:00:00.64", 1),
- row("2016-03-27 09:00:00.48", "2016-03-27 09:00:00.68", 1),
- row("2016-03-27 09:00:00.52", "2016-03-27 09:00:00.72", 2),
- row("2016-03-27 09:00:00.56", "2016-03-27 09:00:00.76", 2),
- row("2016-03-27 09:00:00.6", "2016-03-27 09:00:00.8", 2),
- row("2016-03-27 09:00:00.64", "2016-03-27 09:00:00.84", 1),
- row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
+ Seq(row(localDateTime("2016-03-27 09:00:00.24"), localDateTime("2016-03-27 09:00:00.44"), 1),
+ row(localDateTime("2016-03-27 09:00:00.28"), localDateTime("2016-03-27 09:00:00.48"), 1),
+ row(localDateTime("2016-03-27 09:00:00.32"), localDateTime("2016-03-27 09:00:00.52"), 1),
+ row(localDateTime("2016-03-27 09:00:00.36"), localDateTime("2016-03-27 09:00:00.56"), 1),
+ row(localDateTime("2016-03-27 09:00:00.4"), localDateTime("2016-03-27 09:00:00.6"), 1),
+ row(localDateTime("2016-03-27 09:00:00.44"), localDateTime("2016-03-27 09:00:00.64"), 1),
+ row(localDateTime("2016-03-27 09:00:00.48"), localDateTime("2016-03-27 09:00:00.68"), 1),
+ row(localDateTime("2016-03-27 09:00:00.52"), localDateTime("2016-03-27 09:00:00.72"), 2),
+ row(localDateTime("2016-03-27 09:00:00.56"), localDateTime("2016-03-27 09:00:00.76"), 2),
+ row(localDateTime("2016-03-27 09:00:00.6"), localDateTime("2016-03-27 09:00:00.8"), 2),
+ row(localDateTime("2016-03-27 09:00:00.64"), localDateTime("2016-03-27 09:00:00.84"), 1),
+ row(localDateTime("2016-03-27 09:00:00.68"), localDateTime("2016-03-27 09:00:00.88"), 1))
)
}
@@ -392,7 +392,7 @@ class WindowAggregateITCase extends BatchTestBase {
Types.INT,
Types.LONG,
Types.STRING,
- Types.SQL_TIMESTAMP))
+ Types.LOCAL_DATE_TIME))
// val colStats = Map[String, ColumnStats](
// "ts" -> new ColumnStats(9000000L, 1L, 8D, 8, null, null),
// "a" -> new ColumnStats(10000000L, 1L, 8D, 8, 5, -5),
@@ -506,7 +506,7 @@ class WindowAggregateITCase extends BatchTestBase {
row(null, 4)
)
registerCollection(
- "T1", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO),
+ "T1", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO),
"ts, v")
checkResult(
"""
@@ -520,13 +520,13 @@ class WindowAggregateITCase extends BatchTestBase {
// Tumbling window
data = Seq(
- row(UTCTimestamp("2016-03-27 09:00:05"), 1),
+ row(localDateTime("2016-03-27 09:00:05"), 1),
row(null, 2),
- row(UTCTimestamp("2016-03-27 09:00:32"), 3),
+ row(localDateTime("2016-03-27 09:00:32"), 3),
row(null, 4)
)
registerCollection(
- "T2", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO),
+ "T2", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO),
"ts, v")
checkResult(
"""
@@ -536,8 +536,8 @@ class WindowAggregateITCase extends BatchTestBase {
""".stripMargin,
// null columns are dropped
Seq(
- row("2016-03-27 09:00:00.0", "2016-03-27 09:00:10.0", 1),
- row("2016-03-27 09:00:30.0", "2016-03-27 09:00:40.0", 3))
+ row(localDateTime("2016-03-27 09:00:00.0"), localDateTime("2016-03-27 09:00:10.0"), 1),
+ row(localDateTime("2016-03-27 09:00:30.0"), localDateTime("2016-03-27 09:00:40.0"), 3))
)
data = Seq(
row(null, 1),
@@ -546,7 +546,7 @@ class WindowAggregateITCase extends BatchTestBase {
row(null, 4)
)
registerCollection(
- "T3", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO),
+ "T3", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO),
"ts, v")
checkResult(
"""
@@ -562,9 +562,9 @@ class WindowAggregateITCase extends BatchTestBase {
@Test
def testNegativeInputTimestamp(): Unit = {
// simple tumbling window with record at window start
- var data = Seq(row(UTCTimestamp("2016-03-27 19:39:30"), 1, "a"))
+ var data = Seq(row(localDateTime("2016-03-27 19:39:30"), 1, "a"))
registerCollection(
- "T1", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO, STRING_TYPE_INFO),
+ "T1", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO, STRING_TYPE_INFO),
"ts, value, id")
checkResult(
"""
@@ -573,13 +573,13 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T1
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
""".stripMargin,
- Seq(row("2016-03-27 19:39:30.0", "2016-03-27 19:39:40.0", 1))
+ Seq(row(localDateTime("2016-03-27 19:39:30.0"), localDateTime("2016-03-27 19:39:40.0"), 1))
)
// simple tumbling window with record at negative timestamp
- data = Seq(row(UTCTimestamp("1916-03-27 19:39:31"), 1, "a"))
+ data = Seq(row(localDateTime("1916-03-27 19:39:31"), 1, "a"))
registerCollection(
- "T2", data, new RowTypeInfo(TIMESTAMP, INT_TYPE_INFO, STRING_TYPE_INFO),
+ "T2", data, new RowTypeInfo(LOCAL_DATE_TIME, INT_TYPE_INFO, STRING_TYPE_INFO),
"ts, value, id")
checkResult(
"""
@@ -588,7 +588,7 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T2
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
""".stripMargin,
- Seq(row("1916-03-27 19:39:30.0", "1916-03-27 19:39:40.0", 1))
+ Seq(row(localDateTime("1916-03-27 19:39:30.0"), localDateTime("1916-03-27 19:39:40.0"), 1))
)
// simple sliding window with record at negative timestamp
@@ -601,7 +601,7 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T2
|GROUP BY HOP(ts, INTERVAL '10' SECOND, INTERVAL '11' SECOND)
""".stripMargin,
- Seq(row("1916-03-27 19:39:30.0", "1916-03-27 19:39:41.0", 1))
+ Seq(row(localDateTime("1916-03-27 19:39:30.0"), localDateTime("1916-03-27 19:39:41.0"), 1))
)
checkResult(
@@ -613,8 +613,9 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T2
|GROUP BY HOP(ts, INTERVAL '0.001' SECOND(1,3), INTERVAL '0.002' SECOND(1,3))
""".stripMargin,
- Seq(row("1916-03-27 19:39:30.999", "1916-03-27 19:39:31.001", 1),
- row("1916-03-27 19:39:31.0", "1916-03-27 19:39:31.002", 1))
+ Seq(
+ row(localDateTime("1916-03-27 19:39:30.999"), localDateTime("1916-03-27 19:39:31.001"), 1),
+ row(localDateTime("1916-03-27 19:39:31.0"), localDateTime("1916-03-27 19:39:31.002"), 1))
)
checkResult(
@@ -626,8 +627,9 @@ class WindowAggregateITCase extends BatchTestBase {
|FROM T2
|GROUP BY HOP(ts, INTERVAL '0.001' SECOND(1,3), INTERVAL '0.002' SECOND(1,3))
""".stripMargin,
- Seq(row("1916-03-27 19:39:30.999", "1916-03-27 19:39:31.001", 1),
- row("1916-03-27 19:39:31.0", "1916-03-27 19:39:31.002", 1))
+ Seq(
+ row(localDateTime("1916-03-27 19:39:30.999"), localDateTime("1916-03-27 19:39:31.001"), 1),
+ row(localDateTime("1916-03-27 19:39:31.0"), localDateTime("1916-03-27 19:39:31.002"), 1))
)
}
@@ -636,7 +638,7 @@ class WindowAggregateITCase extends BatchTestBase {
registerCollection(
"T",
data3WithTimestamp,
- new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, TIMESTAMP),
+ new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, LOCAL_DATE_TIME),
"a, b, c, ts")
val sqlQuery =
@@ -648,16 +650,26 @@ class WindowAggregateITCase extends BatchTestBase {
"GROUP BY b, TUMBLE(ts, INTERVAL '5' SECOND)"
checkResult(sqlQuery, Seq(
- row(1, 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(2, 2, "1970-01-01 00:00:00.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(3, 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(3, 2, "1970-01-01 00:00:05.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(4, 1, "1970-01-01 00:00:10.0", "1970-01-01 00:00:15.0", "1970-01-01 00:00:14.999"),
- row(4, 3, "1970-01-01 00:00:05.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(5, 1, "1970-01-01 00:00:15.0", "1970-01-01 00:00:20.0", "1970-01-01 00:00:19.999"),
- row(5, 4, "1970-01-01 00:00:10.0", "1970-01-01 00:00:15.0", "1970-01-01 00:00:14.999"),
- row(6, 2, "1970-01-01 00:00:20.0", "1970-01-01 00:00:25.0", "1970-01-01 00:00:24.999"),
- row(6, 4, "1970-01-01 00:00:15.0", "1970-01-01 00:00:20.0", "1970-01-01 00:00:19.999")))
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0"),
+ localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:04.999")),
+ row(2, 2, localDateTime("1970-01-01 00:00:00.0"),
+ localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:04.999")),
+ row(3, 1, localDateTime("1970-01-01 00:00:00.0"),
+ localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:04.999")),
+ row(3, 2, localDateTime("1970-01-01 00:00:05.0"),
+ localDateTime("1970-01-01 00:00:10.0"), localDateTime("1970-01-01 00:00:09.999")),
+ row(4, 1, localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:14.999")),
+ row(4, 3, localDateTime("1970-01-01 00:00:05.0"),
+ localDateTime("1970-01-01 00:00:10.0"), localDateTime("1970-01-01 00:00:09.999")),
+ row(5, 1, localDateTime("1970-01-01 00:00:15.0"),
+ localDateTime("1970-01-01 00:00:20.0"), localDateTime("1970-01-01 00:00:19.999")),
+ row(5, 4, localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:14.999")),
+ row(6, 2, localDateTime("1970-01-01 00:00:20.0"),
+ localDateTime("1970-01-01 00:00:25.0"), localDateTime("1970-01-01 00:00:24.999")),
+ row(6, 4, localDateTime("1970-01-01 00:00:15.0"),
+ localDateTime("1970-01-01 00:00:20.0"), localDateTime("1970-01-01 00:00:19.999"))))
}
@Test
@@ -665,7 +677,7 @@ class WindowAggregateITCase extends BatchTestBase {
registerCollection(
"T",
data3WithTimestamp,
- new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, TIMESTAMP),
+ new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, LOCAL_DATE_TIME),
"a, b, c, ts")
val sqlQuery =
@@ -677,22 +689,38 @@ class WindowAggregateITCase extends BatchTestBase {
"GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND)"
checkResult(sqlQuery, Seq(
- row(1, 1, "1969-12-31 23:59:55.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(1, 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(2, 2, "1969-12-31 23:59:55.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(2, 2, "1970-01-01 00:00:00.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(3, 1, "1969-12-31 23:59:55.0", "1970-01-01 00:00:05.0", "1970-01-01 00:00:04.999"),
- row(3, 2, "1970-01-01 00:00:05.0", "1970-01-01 00:00:15.0", "1970-01-01 00:00:14.999"),
- row(3, 3, "1970-01-01 00:00:00.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(4, 1, "1970-01-01 00:00:10.0", "1970-01-01 00:00:20.0", "1970-01-01 00:00:19.999"),
- row(4, 3, "1970-01-01 00:00:00.0", "1970-01-01 00:00:10.0", "1970-01-01 00:00:09.999"),
- row(4, 4, "1970-01-01 00:00:05.0", "1970-01-01 00:00:15.0", "1970-01-01 00:00:14.999"),
- row(5, 1, "1970-01-01 00:00:15.0", "1970-01-01 00:00:25.0", "1970-01-01 00:00:24.999"),
- row(5, 4, "1970-01-01 00:00:05.0", "1970-01-01 00:00:15.0", "1970-01-01 00:00:14.999"),
- row(5, 5, "1970-01-01 00:00:10.0", "1970-01-01 00:00:20.0", "1970-01-01 00:00:19.999"),
- row(6, 2, "1970-01-01 00:00:20.0", "1970-01-01 00:00:30.0", "1970-01-01 00:00:29.999"),
- row(6, 4, "1970-01-01 00:00:10.0", "1970-01-01 00:00:20.0", "1970-01-01 00:00:19.999"),
- row(6, 6, "1970-01-01 00:00:15.0", "1970-01-01 00:00:25.0", "1970-01-01 00:00:24.999")))
+ row(1, 1, localDateTime("1969-12-31 23:59:55.0"), localDateTime("1970-01-01 00:00:05.0"),
+ localDateTime("1970-01-01 00:00:04.999")),
+ row(1, 1, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:09.999")),
+ row(2, 2, localDateTime("1969-12-31 23:59:55.0"), localDateTime("1970-01-01 00:00:05.0"),
+ localDateTime("1970-01-01 00:00:04.999")),
+ row(2, 2, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:09.999")),
+ row(3, 1, localDateTime("1969-12-31 23:59:55.0"), localDateTime("1970-01-01 00:00:05.0"),
+ localDateTime("1970-01-01 00:00:04.999")),
+ row(3, 2, localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:15.0"),
+ localDateTime("1970-01-01 00:00:14.999")),
+ row(3, 3, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:09.999")),
+ row(4, 1, localDateTime("1970-01-01 00:00:10.0"), localDateTime("1970-01-01 00:00:20.0"),
+ localDateTime("1970-01-01 00:00:19.999")),
+ row(4, 3, localDateTime("1970-01-01 00:00:00.0"), localDateTime("1970-01-01 00:00:10.0"),
+ localDateTime("1970-01-01 00:00:09.999")),
+ row(4, 4, localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:15.0"),
+ localDateTime("1970-01-01 00:00:14.999")),
+ row(5, 1, localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:25.0"),
+ localDateTime("1970-01-01 00:00:24.999")),
+ row(5, 4, localDateTime("1970-01-01 00:00:05.0"), localDateTime("1970-01-01 00:00:15.0"),
+ localDateTime("1970-01-01 00:00:14.999")),
+ row(5, 5, localDateTime("1970-01-01 00:00:10.0"), localDateTime("1970-01-01 00:00:20.0"),
+ localDateTime("1970-01-01 00:00:19.999")),
+ row(6, 2, localDateTime("1970-01-01 00:00:20.0"), localDateTime("1970-01-01 00:00:30.0"),
+ localDateTime("1970-01-01 00:00:29.999")),
+ row(6, 4, localDateTime("1970-01-01 00:00:10.0"), localDateTime("1970-01-01 00:00:20.0"),
+ localDateTime("1970-01-01 00:00:19.999")),
+ row(6, 6, localDateTime("1970-01-01 00:00:15.0"), localDateTime("1970-01-01 00:00:25.0"),
+ localDateTime("1970-01-01 00:00:24.999"))))
}
@Test(expected = classOf[RuntimeException])
@@ -700,7 +728,7 @@ class WindowAggregateITCase extends BatchTestBase {
registerCollection(
"T",
data3WithTimestamp,
- new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, TIMESTAMP),
+ new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, LOCAL_DATE_TIME),
"a, b, c, ts")
val sqlQuery =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 2b999ed..9cf54d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, R
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.runtime.utils.TestData._
import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, BatchTestBase, CollectionBatchExecTable, UserDefinedFunctionTestUtils}
-import org.apache.flink.table.util.DateTimeTestUtil.{UTCDate, UTCTime, UTCTimestamp}
+import org.apache.flink.table.util.DateTimeTestUtil.localDateTime
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
import org.apache.flink.types.Row
@@ -34,7 +34,8 @@ import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit.{Before, Ignore, Test}
-import java.sql.Timestamp
+import java.sql.{Date, Time, Timestamp}
+import java.time.LocalDateTime
import java.util
import scala.collection.JavaConverters._
@@ -287,19 +288,19 @@ class CalcITCase extends BatchTestBase {
val bd2 = BigDecimal("4E+16").bigDecimal
val t = BatchTableEnvUtil.fromCollection(tEnv,
- Seq(
- (bd1, bd2, UTCDate("1984-07-12"), UTCTime("14:34:24"), UTCTimestamp("1984-07-12 14:34:24"))
- ), "_1, _2, _3, _4, _5")
- .select('_1, '_2, '_3, '_4, '_5, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
- UTCDate("1984-07-12"), UTCTime("14:34:24"),
- UTCTimestamp("1984-07-12 14:34:24"))
+ Seq((bd1, bd2, Date.valueOf("1984-07-12"),
+ Time.valueOf("14:34:24"),
+ Timestamp.valueOf("1984-07-12 14:34:24"))), "_1, _2, _3, _4, _5")
+ .select('_1, '_2, '_3, '_4, '_5, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+ Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
+ Timestamp.valueOf("1984-07-12 14:34:24"))
// inferred Decimal(p,s) from BigDecimal.class
val bd1x = bd1.setScale(Decimal.DECIMAL_SYSTEM_DEFAULT.getScale)
val bd2x = bd2.setScale(Decimal.DECIMAL_SYSTEM_DEFAULT.getScale)
- val expected = s"$bd1x,$bd2x,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
- "11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
+ val expected = s"$bd1x,$bd2x,1984-07-12,14:34:24,1984-07-12T14:34:24," +
+ "11.2,11.2,1984-07-12,14:34:24,1984-07-12T14:34:24"
val results = executeQuery(t)
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -567,8 +568,8 @@ class CalcITCase extends BatchTestBase {
@Test
def testValueConstructor(): Unit = {
- val data = new mutable.MutableList[(String, Int, Timestamp)]
- data.+=(("foo", 12, UTCTimestamp("1984-07-12 14:34:24")))
+ val data = new mutable.MutableList[(String, Int, LocalDateTime)]
+ data.+=(("foo", 12, localDateTime("1984-07-12 14:34:24")))
val t = BatchTableEnvUtil.fromCollection(tEnv, data, "a, b, c").select(
row('a, 'b, 'c),
array(12, 'b),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 244e2b7..0f5b954 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -30,6 +30,8 @@ import org.apache.flink.test.util.TestBaseUtils
import org.junit.{Assert, Ignore, Test}
+import java.sql.{Date, Timestamp}
+
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -200,13 +202,12 @@ class CorrelateITCase extends BatchTestBase {
val result = in
.where('a === 1)
- .select(UTCDate("1990-10-14") as 'x,
+ .select(Date.valueOf("1990-10-14") as 'x,
1000L as 'y,
- UTCTimestamp("1990-10-14 12:10:10") as 'z)
+ Timestamp.valueOf("1990-10-14 12:10:10") as 'z)
.joinLateral(func0('x, 'y, 'z) as 's)
.select('s)
-
val results = executeQuery(result)
val expected = "1000\n" + "655906210000\n" + "7591\n"
TestBaseUtils.compareResultAsText(results.asJava, expected)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
index 7d963af..981d5a2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
@@ -87,12 +87,12 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, 'int.sum, 'w.start, 'w.end, 'w.rowtime)
val expected =
- "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
- "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" +
- "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
- "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
- "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n"
+ "Hello world,3,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1970-01-01T00:00:00.009\n" +
+ "Hello world,4,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1970-01-01T00:00:00.019\n" +
+ "Hello,7,1970-01-01T00:00,1970-01-01T00:00:00.005,1970-01-01T00:00:00.004\n" +
+ "Hello,3,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1970-01-01T00:00:00.009\n" +
+ "Hallo,2,1970-01-01T00:00,1970-01-01T00:00:00.005,1970-01-01T00:00:00.004\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.005,1970-01-01T00:00:00.004\n"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -109,9 +109,9 @@ class GroupWindowITCase extends BatchTestBase {
.select('int.sum, 'w.start, 'w.end, 'w.rowtime)
val expected =
- "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
- "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
- "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n"
+ "10,1970-01-01T00:00,1970-01-01T00:00:00.005,1970-01-01T00:00:00.004\n" +
+ "6,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1970-01-01T00:00:00.009\n" +
+ "4,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1970-01-01T00:00:00.019\n"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -151,7 +151,7 @@ class GroupWindowITCase extends BatchTestBase {
val expected =
"4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006,1970-01-01 00:00:00.005\n" +
- "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
+ "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.010,1970-01-01 00:00:00.009\n" +
"1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018,1970-01-01 00:00:00.017"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -172,12 +172,12 @@ class GroupWindowITCase extends BatchTestBase {
val results = executeQuery(windowedTable)
val expected =
- "Hallo,1,1970-01-01 00:00:00.006\n" +
- "Hello world,1,1970-01-01 00:00:00.012\n" +
- "Hello world,1,1970-01-01 00:00:00.018\n" +
- "Hello,1,1970-01-01 00:00:00.012\n" +
- "Hello,2,1970-01-01 00:00:00.006\n" +
- "Hi,1,1970-01-01 00:00:00.006\n"
+ "Hallo,1,1970-01-01T00:00:00.006\n" +
+ "Hello world,1,1970-01-01T00:00:00.012\n" +
+ "Hello world,1,1970-01-01T00:00:00.018\n" +
+ "Hello,1,1970-01-01T00:00:00.012\n" +
+ "Hello,2,1970-01-01T00:00:00.006\n" +
+ "Hi,1,1970-01-01T00:00:00.006\n"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -211,15 +211,15 @@ class GroupWindowITCase extends BatchTestBase {
.select('int.count, 'w.start, 'w.end, 'w.rowtime)
val expected =
- "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012\n" +
- "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016\n" +
- "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018\n" +
- "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02\n" +
- "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002\n" +
- "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01\n" +
- "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006\n" +
- "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" +
- "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004"
+ "1,1970-01-01T00:00:00.008,1970-01-01T00:00:00.013,1970-01-01T00:00:00.012\n" +
+ "1,1970-01-01T00:00:00.012,1970-01-01T00:00:00.017,1970-01-01T00:00:00.016\n" +
+ "1,1970-01-01T00:00:00.014,1970-01-01T00:00:00.019,1970-01-01T00:00:00.018\n" +
+ "1,1970-01-01T00:00:00.016,1970-01-01T00:00:00.021,1970-01-01T00:00:00.020\n" +
+ "2,1969-12-31T23:59:59.998,1970-01-01T00:00:00.003,1970-01-01T00:00:00.002\n" +
+ "2,1970-01-01T00:00:00.006,1970-01-01T00:00:00.011,1970-01-01T00:00:00.010\n" +
+ "3,1970-01-01T00:00:00.002,1970-01-01T00:00:00.007,1970-01-01T00:00:00.006\n" +
+ "3,1970-01-01T00:00:00.004,1970-01-01T00:00:00.009,1970-01-01T00:00:00.008\n" +
+ "4,1970-01-01T00:00,1970-01-01T00:00:00.005,1970-01-01T00:00:00.004"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -238,17 +238,17 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val expected =
- "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
- "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
- "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
- "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
- "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
- "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
- "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
- "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
- "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
+ "Hallo,1,1969-12-31T23:59:59.995,1970-01-01T00:00:00.005\n" +
+ "Hallo,1,1970-01-01T00:00,1970-01-01T00:00:00.010\n" +
+ "Hello world,1,1970-01-01T00:00,1970-01-01T00:00:00.010\n" +
+ "Hello world,1,1970-01-01T00:00:00.005,1970-01-01T00:00:00.015\n" +
+ "Hello world,1,1970-01-01T00:00:00.010,1970-01-01T00:00:00.020\n" +
+ "Hello world,1,1970-01-01T00:00:00.015,1970-01-01T00:00:00.025\n" +
+ "Hello,1,1970-01-01T00:00:00.005,1970-01-01T00:00:00.015\n" +
+ "Hello,2,1969-12-31T23:59:59.995,1970-01-01T00:00:00.005\n" +
+ "Hello,3,1970-01-01T00:00,1970-01-01T00:00:00.010\n" +
+ "Hi,1,1969-12-31T23:59:59.995,1970-01-01T00:00:00.005\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.010"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -266,14 +266,14 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val expected =
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
- "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
- "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
- "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
- "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
- "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+ "Hallo,1,1970-01-01T00:00,1970-01-01T00:00:00.005\n" +
+ "Hello world,1,1970-01-01T00:00:00.004,1970-01-01T00:00:00.009\n" +
+ "Hello world,1,1970-01-01T00:00:00.008,1970-01-01T00:00:00.013\n" +
+ "Hello world,1,1970-01-01T00:00:00.012,1970-01-01T00:00:00.017\n" +
+ "Hello world,1,1970-01-01T00:00:00.016,1970-01-01T00:00:00.021\n" +
+ "Hello,2,1970-01-01T00:00,1970-01-01T00:00:00.005\n" +
+ "Hello,2,1970-01-01T00:00:00.004,1970-01-01T00:00:00.009\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.005"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -291,9 +291,9 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val expected =
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+ "Hallo,1,1970-01-01T00:00,1970-01-01T00:00:00.005\n" +
+ "Hello,2,1970-01-01T00:00,1970-01-01T00:00:00.005\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.005"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -311,8 +311,8 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val expected =
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003"
+ "Hallo,1,1970-01-01T00:00,1970-01-01T00:00:00.003\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.003"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -334,8 +334,8 @@ class GroupWindowITCase extends BatchTestBase {
.select('string, countFunc('int), 'w.start, 'w.end)
val expected =
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" +
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003"
+ "Hallo,1,1970-01-01T00:00,1970-01-01T00:00:00.003\n" +
+ "Hi,1,1970-01-01T00:00,1970-01-01T00:00:00.003"
val results = executeQuery(windowedTable)
TestBaseUtils.compareResultAsText(results.asJava, expected)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/OverWindowITCase.scala
index 1452e37..71e9740 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/OverWindowITCase.scala
@@ -1245,21 +1245,21 @@ class OverWindowITCase extends BatchTestBase {
'a, 'd, '*.count over 'w
),
Seq(
- row(1, UTCDate("2017-04-08"), 1),
- row(2, UTCDate("2017-04-08"), 2),
- row(2, UTCDate("2017-04-09"), 1),
- row(3, UTCDate("2016-08-08"), 1),
- row(3, UTCDate("2017-04-10"), 1),
- row(3, UTCDate("2017-10-11"), 1),
- row(4, UTCDate("2017-02-06"), 1),
- row(4, UTCDate("2017-05-19"), 2),
- row(4, UTCDate("2017-05-20"), 1),
- row(4, UTCDate("2017-11-11"), 1),
- row(5, UTCDate("2017-02-02"), 1),
- row(5, UTCDate("2017-07-01"), 1),
- row(5, UTCDate("2017-07-20"), 1),
- row(5, UTCDate("2017-09-08"), 1),
- row(5, UTCDate("2017-10-01"), 1)
+ row(1, localDate("2017-04-08"), 1),
+ row(2, localDate("2017-04-08"), 2),
+ row(2, localDate("2017-04-09"), 1),
+ row(3, localDate("2016-08-08"), 1),
+ row(3, localDate("2017-04-10"), 1),
+ row(3, localDate("2017-10-11"), 1),
+ row(4, localDate("2017-02-06"), 1),
+ row(4, localDate("2017-05-19"), 2),
+ row(4, localDate("2017-05-20"), 1),
+ row(4, localDate("2017-11-11"), 1),
+ row(5, localDate("2017-02-02"), 1),
+ row(5, localDate("2017-07-01"), 1),
+ row(5, localDate("2017-07-20"), 1),
+ row(5, localDate("2017-09-08"), 1),
+ row(5, localDate("2017-10-01"), 1)
)
)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
index 969206f..15811e7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
@@ -32,9 +32,9 @@ import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.MiniB
import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
-import org.apache.flink.table.runtime.utils.{StreamingWithAggTestBase, TestData, TestingRetractSink}
+import org.apache.flink.table.runtime.utils.{BatchTestBase, StreamingWithAggTestBase, TestData, TestingRetractSink}
import org.apache.flink.table.typeutils.BigDecimalTypeInfo
-import org.apache.flink.table.util.DateTimeTestUtil._
+import org.apache.flink.table.util.DateTimeTestUtil.{localDate, localDateTime, localTime => mLocalTime}
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -1152,21 +1152,22 @@ class AggregateITCase(
@Test
def testTimestampDistinct(): Unit = {
- val data = new mutable.MutableList[(java.sql.Timestamp, Long, String)]
- data.+=((UTCTimestamp("1970-01-01 00:00:01"), 1L, "A"))
- data.+=((UTCTimestamp("1970-01-01 00:00:02"), 2L, "B"))
- data.+=((UTCTimestamp("1970-01-01 00:00:03"), 2L, "B"))
- data.+=((UTCTimestamp("1970-01-01 00:00:04"), 3L, "C"))
- data.+=((UTCTimestamp("1970-01-01 00:00:05"), 3L, "C"))
- data.+=((UTCTimestamp("1970-01-01 00:00:06"), 3L, "C"))
- data.+=((UTCTimestamp("1970-01-01 00:00:07"), 4L, "B"))
- data.+=((UTCTimestamp("1970-01-01 00:00:08"), 4L, "A"))
- data.+=((UTCTimestamp("1970-01-01 00:00:09"), 4L, "D"))
- data.+=((UTCTimestamp("1970-01-01 00:00:10"), 4L, "E"))
- data.+=((UTCTimestamp("1970-01-01 00:00:11"), 5L, "A"))
- data.+=((UTCTimestamp("1970-01-01 00:00:12"), 5L, "B"))
-
- val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+ val data = new mutable.MutableList[Row]
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:01"), Long.box(1L), "A"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:02"), Long.box(2L), "B"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:03"), Long.box(2L), "B"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:04"), Long.box(3L), "C"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:05"), Long.box(3L), "C"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:06"), Long.box(3L), "C"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:07"), Long.box(4L), "B"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:08"), Long.box(4L), "A"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:09"), Long.box(4L), "D"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:10"), Long.box(4L), "E"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:11"), Long.box(5L), "A"))
+ data.+=(Row.of(localDateTime("1970-01-01 00:00:12"), Long.box(5L), "B"))
+
+ val t = failingDataSource(data)(new RowTypeInfo(
+ Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
tEnv.registerTable("T", t)
val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
@@ -1180,21 +1181,22 @@ class AggregateITCase(
@Test
def testDateDistinct(): Unit = {
- val data = new mutable.MutableList[(java.sql.Date, Long, String)]
- data.+=((UTCDate("1970-01-01"), 1L, "A"))
- data.+=((UTCDate("1970-01-02"), 2L, "B"))
- data.+=((UTCDate("1970-01-03"), 2L, "B"))
- data.+=((UTCDate("1970-01-04"), 3L, "C"))
- data.+=((UTCDate("1970-01-05"), 3L, "C"))
- data.+=((UTCDate("1970-01-06"), 3L, "C"))
- data.+=((UTCDate("1970-01-07"), 4L, "B"))
- data.+=((UTCDate("1970-01-08"), 4L, "A"))
- data.+=((UTCDate("1970-01-09"), 4L, "D"))
- data.+=((UTCDate("1970-01-10"), 4L, "E"))
- data.+=((UTCDate("1970-01-11"), 5L, "A"))
- data.+=((UTCDate("1970-01-12"), 5L, "B"))
-
- val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+ val data = new mutable.MutableList[Row]
+ data.+=(Row.of(localDate("1970-01-01"), Long.box(1L), "A"))
+ data.+=(Row.of(localDate("1970-01-02"), Long.box(2L), "B"))
+ data.+=(Row.of(localDate("1970-01-03"), Long.box(2L), "B"))
+ data.+=(Row.of(localDate("1970-01-04"), Long.box(3L), "C"))
+ data.+=(Row.of(localDate("1970-01-05"), Long.box(3L), "C"))
+ data.+=(Row.of(localDate("1970-01-06"), Long.box(3L), "C"))
+ data.+=(Row.of(localDate("1970-01-07"), Long.box(4L), "B"))
+ data.+=(Row.of(localDate("1970-01-08"), Long.box(4L), "A"))
+ data.+=(Row.of(localDate("1970-01-09"), Long.box(4L), "D"))
+ data.+=(Row.of(localDate("1970-01-10"), Long.box(4L), "E"))
+ data.+=(Row.of(localDate("1970-01-11"), Long.box(5L), "A"))
+ data.+=(Row.of(localDate("1970-01-12"), Long.box(5L), "B"))
+
+ val t = failingDataSource(data)(new RowTypeInfo(
+ Types.LOCAL_DATE, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
tEnv.registerTable("T", t)
val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
@@ -1208,21 +1210,22 @@ class AggregateITCase(
@Test
def testTimeDistinct(): Unit = {
- val data = new mutable.MutableList[(java.sql.Time, Long, String)]
- data.+=((UTCTime("00:00:01"), 1L, "A"))
- data.+=((UTCTime("00:00:02"), 2L, "B"))
- data.+=((UTCTime("00:00:03"), 2L, "B"))
- data.+=((UTCTime("00:00:04"), 3L, "C"))
- data.+=((UTCTime("00:00:05"), 3L, "C"))
- data.+=((UTCTime("00:00:06"), 3L, "C"))
- data.+=((UTCTime("00:00:07"), 4L, "B"))
- data.+=((UTCTime("00:00:08"), 4L, "A"))
- data.+=((UTCTime("00:00:09"), 4L, "D"))
- data.+=((UTCTime("00:00:10"), 4L, "E"))
- data.+=((UTCTime("00:00:11"), 5L, "A"))
- data.+=((UTCTime("00:00:12"), 5L, "B"))
-
- val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
+ val data = new mutable.MutableList[Row]
+ data.+=(Row.of(mLocalTime("00:00:01"), Long.box(1L), "A"))
+ data.+=(Row.of(mLocalTime("00:00:02"), Long.box(2L), "B"))
+ data.+=(Row.of(mLocalTime("00:00:03"), Long.box(2L), "B"))
+ data.+=(Row.of(mLocalTime("00:00:04"), Long.box(3L), "C"))
+ data.+=(Row.of(mLocalTime("00:00:05"), Long.box(3L), "C"))
+ data.+=(Row.of(mLocalTime("00:00:06"), Long.box(3L), "C"))
+ data.+=(Row.of(mLocalTime("00:00:07"), Long.box(4L), "B"))
+ data.+=(Row.of(mLocalTime("00:00:08"), Long.box(4L), "A"))
+ data.+=(Row.of(mLocalTime("00:00:09"), Long.box(4L), "D"))
+ data.+=(Row.of(mLocalTime("00:00:10"), Long.box(4L), "E"))
+ data.+=(Row.of(mLocalTime("00:00:11"), Long.box(5L), "A"))
+ data.+=(Row.of(mLocalTime("00:00:12"), Long.box(5L), "B"))
+
+ val t = failingDataSource(data)(new RowTypeInfo(
+ Types.LOCAL_TIME, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
tEnv.registerTable("T", t)
val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
index 981c3b8..b3e0c0e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -308,7 +308,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
result.addSink(sink)
env.execute()
- val expected = List("ACME,2,1970-01-01 00:00:03.000")
+ val expected = List("ACME,2,1970-01-01T00:00:03")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
@@ -362,8 +362,8 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
env.execute()
val expected = List(
- "ACME,3,1970-01-01 00:00:02.999,1970-01-01 00:00:00.000",
- "ACME,2,1970-01-01 00:00:05.999,1970-01-01 00:00:03.000")
+ "ACME,3,1970-01-01T00:00:02.999,1970-01-01T00:00",
+ "ACME,2,1970-01-01T00:00:05.999,1970-01-01T00:00:03")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
index ba73c6c..b4fd181 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
@@ -79,7 +79,7 @@ class TableScanITCase extends StreamingTestBase {
val data = Seq("Mary", "Peter", "Bob", "Liz")
- val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+ val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.LOCAL_DATE_TIME))
val returnType = Types.STRING
val tableSource = new TestTableSourceWithTime(false, schema, returnType, data, null, "ptime")
@@ -112,7 +112,7 @@ class TableScanITCase extends StreamingTestBase {
)
val fieldNames = Array("id", "rtime", "name")
- val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
+ val schema = new TableSchema(fieldNames, Array(Types.INT, Types.LOCAL_DATE_TIME, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index fa4922a..5441992 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -45,7 +45,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -79,7 +79,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
Array(
- Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -95,10 +95,10 @@ class TableSourceITCase extends StreamingTestBase {
env.execute()
val expected = Seq(
- "1970-01-01 00:00:00.001,Mary,1",
- "1970-01-01 00:00:00.002,Bob,2",
- "1970-01-01 00:00:00.002,Mike,3",
- "1970-01-01 00:00:02.001,Liz,4")
+ "1970-01-01T00:00:00.001,Mary,1",
+ "1970-01-01T00:00:00.002,Bob,2",
+ "1970-01-01T00:00:00.002,Mike,3",
+ "1970-01-01T00:00:02.001,Liz,4")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
@@ -112,7 +112,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -145,7 +145,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -174,7 +174,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -207,7 +207,7 @@ class TableSourceITCase extends StreamingTestBase {
val tableSchema = new TableSchema(
Array("id", "rtime", "val", "ptime", "name"),
- Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ Array(Types.INT, Types.LOCAL_DATE_TIME, Types.LONG, Types.LOCAL_DATE_TIME, Types.STRING))
val returnType = new RowTypeInfo(
Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
.asInstanceOf[Array[TypeInformation[_]]],
@@ -225,10 +225,10 @@ class TableSourceITCase extends StreamingTestBase {
env.execute()
val expected = Seq(
- "Mary,1970-01-01 00:00:00.001,10",
- "Bob,1970-01-01 00:00:00.002,20",
- "Mike,1970-01-01 00:00:00.002,30",
- "Liz,1970-01-01 00:00:02.001,40")
+ "Mary,1970-01-01T00:00:00.001,10",
+ "Bob,1970-01-01T00:00:00.002,20",
+ "Mike,1970-01-01T00:00:00.002,30",
+ "Liz,1970-01-01T00:00:02.001,40")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowAggregateITCase.scala
index 4f15a72..e2b782e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowAggregateITCase.scala
@@ -85,16 +85,16 @@ class WindowAggregateITCase(mode: StateBackendMode)
env.execute()
val expected = Seq(
- "Hallo,1970-01-01 00:00:00.000,1970-01-01 00:00:00.004,1,1,1,1,a",
- "Hello world,1970-01-01 00:00:00.004,1970-01-01 00:00:00.008,1,1,1,1,a",
- "Hello world,1970-01-01 00:00:00.008,1970-01-01 00:00:00.012,1,1,1,1,a",
- "Hello world,1970-01-01 00:00:00.012,1970-01-01 00:00:00.016,1,1,1,1,b",
- "Hello world,1970-01-01 00:00:00.016,1970-01-01 00:00:00.020,1,1,1,1,b",
- "Hello,1970-01-01 00:00:00.000,1970-01-01 00:00:00.004,2,2,2,2,a",
- "Hello,1970-01-01 00:00:00.004,1970-01-01 00:00:00.008,3,3,3,2,a|b",
- "Hi,1970-01-01 00:00:00.000,1970-01-01 00:00:00.004,1,1,1,1,a",
- "null,1970-01-01 00:00:00.028,1970-01-01 00:00:00.032,1,1,1,1,null",
- "null,1970-01-01 00:00:00.032,1970-01-01 00:00:00.036,1,1,1,1,null")
+ "Hallo,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a",
+ "Hello world,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,1,1,1,1,a",
+ "Hello world,1970-01-01T00:00:00.008,1970-01-01T00:00:00.012,1,1,1,1,a",
+ "Hello world,1970-01-01T00:00:00.012,1970-01-01T00:00:00.016,1,1,1,1,b",
+ "Hello world,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,1,b",
+ "Hello,1970-01-01T00:00,1970-01-01T00:00:00.004,2,2,2,2,a",
+ "Hello,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,3,3,3,2,a|b",
+ "Hi,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a",
+ "null,1970-01-01T00:00:00.028,1970-01-01T00:00:00.032,1,1,1,1,null",
+ "null,1970-01-01T00:00:00.032,1970-01-01T00:00:00.036,1,1,1,1,null")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
@@ -137,9 +137,9 @@ class WindowAggregateITCase(mode: StateBackendMode)
env.execute()
val expected = Seq(
- "Hello World,1970-01-01 00:00:00.009,1970-01-01 00:00:00.013,1,1,1,9,1",
- "Hello,1970-01-01 00:00:00.016,1970-01-01 00:00:00.020,1,1,1,16,1",
- "Hello,1970-01-01 00:00:00.001,1970-01-01 00:00:00.012,4,4,4,15,3")
+ "Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1",
+ "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1",
+ "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
@@ -186,8 +186,8 @@ class WindowAggregateITCase(mode: StateBackendMode)
val fieldTypes: Array[TypeInformation[_]] = Array(
Types.STRING,
- Types.SQL_TIMESTAMP,
- Types.SQL_TIMESTAMP,
+ Types.LOCAL_DATE_TIME,
+ Types.LOCAL_DATE_TIME,
Types.LONG,
Types.LONG,
Types.INT,
@@ -203,10 +203,10 @@ class WindowAggregateITCase(mode: StateBackendMode)
tEnv.execute("test")
val expected = Seq(
- "Hi,1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,1,1,1,1,1,1,1",
- "Hello,1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,2,3,2,3,2,3,7",
- "Hello world,1970-01-01 00:00:00.015,1970-01-01 00:00:00.020,1,1,3,16,3,3,3",
- "Hello world,1970-01-01 00:00:00.005,1970-01-01 00:00:00.010,2,2,3,8,3,4,7")
+ "Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1,1,1,1,1,1,1",
+ "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,2,3,2,3,2,3,7",
+ "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1,1,3,16,3,3,3",
+ "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,2,2,3,8,3,4,7")
assertEquals(expected.sorted.mkString("\n"), sink.getUpsertResults.sorted.mkString("\n"))
}
@@ -240,9 +240,9 @@ class WindowAggregateITCase(mode: StateBackendMode)
env.execute()
val expected = Seq(
- "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L}
- "Hello,1,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged
- "Hello,3,1970-01-01 00:00:00.015" // window starts at [1L,2L],
+ "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till {14L}
+ "Hello,1,1970-01-01T00:00:00.021", // window starts at [16L] till {21L}, not merged
+ "Hello,3,1970-01-01T00:00:00.015" // window starts at [1L,2L],
// merged with [8L,10L], by [4L], till {15L}
)
assertEquals(expected.sorted, sink.getAppendResults.sorted)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowJoinITCase.scala
index b3e3d6b..5bb5397 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/WindowJoinITCase.scala
@@ -425,11 +425,11 @@ class WindowJoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
result.addSink(sink)
env.execute()
val expected = mutable.MutableList[String](
- "A,1970-01-01 00:00:04.000,3",
- "A,1970-01-01 00:00:12.000,2",
- "A,1970-01-01 00:00:16.000,1",
- //"B,1970-01-01 00:00:04.0,1",
- "B,1970-01-01 00:00:08.000,1")
+ "A,1970-01-01T00:00:04,3",
+ "A,1970-01-01T00:00:12,2",
+ "A,1970-01-01T00:00:16,1",
+ //"B,1970-01-01T00:00:04,1",
+ "B,1970-01-01T00:00:08,1")
assertEquals(expected.toList.sorted, sink.getAppendResults.sorted)
}
@@ -476,9 +476,9 @@ class WindowJoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
result.addSink(sink)
env.execute()
val expected = mutable.MutableList[String](
- "A,1970-01-01 00:00:08.000,3",
- "A,1970-01-01 00:00:12.000,3",
- "B,1970-01-01 00:00:08.000,1")
+ "A,1970-01-01T00:00:08,3",
+ "A,1970-01-01T00:00:12,3",
+ "B,1970-01-01T00:00:08,1")
assertEquals(expected.toList.sorted, sink.getAppendResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index 500ad35..db92495 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -43,7 +43,6 @@ import org.junit.Assert._
import org.junit.{Assert, Before}
import java.lang.{Iterable => JIterable}
-import java.util.TimeZone
import java.util.regex.Pattern
import scala.collection.JavaConverters._
@@ -53,7 +52,6 @@ import scala.util.Sorting
class BatchTestBase extends BatchAbstractTestBase {
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment.create(settings)
val tEnv: TableEnvironment = testingTableEnv
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index 31ab797..8067817 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -48,8 +48,6 @@ import _root_.scala.collection.mutable.ArrayBuffer
object StreamTestSink {
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
-
private[utils] val idCounter: AtomicInteger = new AtomicInteger(0)
private[utils] val globalResults =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestData.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestData.scala
index fe18c52..76c97aa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestData.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestData.scala
@@ -19,15 +19,15 @@
package org.apache.flink.table.runtime.utils
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.{DATE, TIME, TIMESTAMP}
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.{LOCAL_DATE, LOCAL_DATE_TIME, LOCAL_TIME}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.util.DateTimeTestUtil._
import org.apache.flink.types.Row
import java.math.{BigDecimal => JBigDecimal}
-import java.sql.Timestamp
import scala.collection.{Seq, mutable}
@@ -40,13 +40,14 @@ object TestData {
val type4 = new RowTypeInfo(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
val type5 = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO,
LONG_TYPE_INFO)
- val type6 = new RowTypeInfo(INT_TYPE_INFO, DOUBLE_TYPE_INFO, STRING_TYPE_INFO, DATE, TIME,
- TIMESTAMP)
+ val type6 = new RowTypeInfo(INT_TYPE_INFO, DOUBLE_TYPE_INFO, STRING_TYPE_INFO, LOCAL_DATE,
+ LOCAL_TIME, LOCAL_DATE_TIME)
val simpleType2 = new RowTypeInfo(INT_TYPE_INFO, DOUBLE_TYPE_INFO)
val buildInType = new RowTypeInfo(BOOLEAN_TYPE_INFO, BYTE_TYPE_INFO, INT_TYPE_INFO,
- LONG_TYPE_INFO, DOUBLE_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DATE, TIME, TIMESTAMP)
+ LONG_TYPE_INFO, DOUBLE_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, LOCAL_DATE, LOCAL_TIME,
+ LOCAL_DATE_TIME)
val numericType = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, FLOAT_TYPE_INFO,
DOUBLE_TYPE_INFO, BIG_DEC_TYPE_INFO)
@@ -56,7 +57,7 @@ object TestData {
val genericType5 = new RowTypeInfo(tupleIntInt, LONG_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO,
LONG_TYPE_INFO)
val type3WithTimestamp = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO,
- TIMESTAMP)
+ LOCAL_DATE_TIME)
val nullablesOfData1 = Array(false, false, false)
@@ -162,14 +163,14 @@ object TestData {
val nullablesOfSmallData5 = Array(false, false, false, false, false)
lazy val buildInData: Seq[Row] = Seq(
- row(false, 1.toByte, 2, 3L, 2.56, "abcd", "f%g", UTCDate("2017-12-12"),
- UTCTime("10:08:09"), UTCTimestamp("2017-11-11 20:32:19")),
+ row(false, 1.toByte, 2, 3L, 2.56, "abcd", "f%g", localDate("2017-12-12"),
+ localTime("10:08:09"), localDateTime("2017-11-11 20:32:19")),
- row(null, 2.toByte, -3, -4L, 90.08, null, "hij_k", UTCDate("2017-12-12"),
- UTCTime("10:08:09"), UTCTimestamp("2017-11-11 20:32:19")),
+ row(null, 2.toByte, -3, -4L, 90.08, null, "hij_k", localDate("2017-12-12"),
+ localTime("10:08:09"), localDateTime("2017-11-11 20:32:19")),
row(true, 3.toByte, -4, -5L, -0.8, "e fg", null, null,
- UTCTime("10:08:09"), UTCTimestamp("2015-05-20 10:00:00.887"))
+ localTime("10:08:09"), localDateTime("2015-05-20 10:00:00.887"))
)
lazy val simpleData2 = Seq(
@@ -234,27 +235,27 @@ object TestData {
val nullablesOfData3WithTimestamp = Array(true, false, false, false)
lazy val data3WithTimestamp: Seq[Row] = Seq(
- row(2, 2L, "Hello", new Timestamp(2000L)),
- row(1, 1L, "Hi", new Timestamp(1000L)),
- row(3, 2L, "Hello world", new Timestamp(3000L)),
- row(4, 3L, "Hello world, how are you?", new Timestamp(4000L)),
- row(5, 3L, "I am fine.", new Timestamp(5000L)),
- row(6, 3L, "Luke Skywalker", new Timestamp(6000L)),
- row(7, 4L, "Comment#1", new Timestamp(7000L)),
- row(8, 4L, "Comment#2", new Timestamp(8000L)),
- row(9, 4L, "Comment#3", new Timestamp(9000L)),
- row(10, 4L, "Comment#4", new Timestamp(10000L)),
- row(11, 5L, "Comment#5", new Timestamp(11000L)),
- row(12, 5L, "Comment#6", new Timestamp(12000L)),
- row(13, 5L, "Comment#7", new Timestamp(13000L)),
- row(15, 5L, "Comment#9", new Timestamp(15000L)),
- row(14, 5L, "Comment#8", new Timestamp(14000L)),
- row(16, 6L, "Comment#10", new Timestamp(16000L)),
- row(17, 6L, "Comment#11", new Timestamp(17000L)),
- row(18, 6L, "Comment#12", new Timestamp(18000L)),
- row(19, 6L, "Comment#13", new Timestamp(19000L)),
- row(20, 6L, "Comment#14", new Timestamp(20000L)),
- row(21, 6L, "Comment#15", new Timestamp(21000L))
+ row(2, 2L, "Hello", unixTimestampToLocalDateTime(2000L)),
+ row(1, 1L, "Hi", unixTimestampToLocalDateTime(1000L)),
+ row(3, 2L, "Hello world", unixTimestampToLocalDateTime(3000L)),
+ row(4, 3L, "Hello world, how are you?", unixTimestampToLocalDateTime(4000L)),
+ row(5, 3L, "I am fine.", unixTimestampToLocalDateTime(5000L)),
+ row(6, 3L, "Luke Skywalker", unixTimestampToLocalDateTime(6000L)),
+ row(7, 4L, "Comment#1", unixTimestampToLocalDateTime(7000L)),
+ row(8, 4L, "Comment#2", unixTimestampToLocalDateTime(8000L)),
+ row(9, 4L, "Comment#3", unixTimestampToLocalDateTime(9000L)),
+ row(10, 4L, "Comment#4", unixTimestampToLocalDateTime(10000L)),
+ row(11, 5L, "Comment#5", unixTimestampToLocalDateTime(11000L)),
+ row(12, 5L, "Comment#6", unixTimestampToLocalDateTime(12000L)),
+ row(13, 5L, "Comment#7", unixTimestampToLocalDateTime(13000L)),
+ row(15, 5L, "Comment#9", unixTimestampToLocalDateTime(15000L)),
+ row(14, 5L, "Comment#8", unixTimestampToLocalDateTime(14000L)),
+ row(16, 6L, "Comment#10", unixTimestampToLocalDateTime(16000L)),
+ row(17, 6L, "Comment#11", unixTimestampToLocalDateTime(17000L)),
+ row(18, 6L, "Comment#12", unixTimestampToLocalDateTime(18000L)),
+ row(19, 6L, "Comment#13", unixTimestampToLocalDateTime(19000L)),
+ row(20, 6L, "Comment#14", unixTimestampToLocalDateTime(20000L)),
+ row(21, 6L, "Comment#15", unixTimestampToLocalDateTime(21000L))
)
lazy val smallNestedTupleData: Seq[((Int, Int), String)] = {
@@ -290,36 +291,36 @@ object TestData {
val nullablesOfData5 = Array(false, false, false, false, false)
lazy val data6: Seq[Row] = Seq(
- row(1, 1.1, "a", UTCDate("2017-04-08"), UTCTime("12:00:59"),
- UTCTimestamp("2015-05-20 10:00:00")),
- row(2, 2.5, "abc", UTCDate("2017-04-09"), UTCTime("12:00:59"),
- UTCTimestamp("2019-09-19 08:03:09")),
- row(2, -2.4, "abcd", UTCDate("2017-04-08"), UTCTime("00:00:00"),
- UTCTimestamp("2016-09-01 23:07:06")),
- row(3, 0.0, "abc?", UTCDate("2017-10-11"), UTCTime("23:59:59"),
- UTCTimestamp("1999-12-12 10:00:00")),
- row(3, -9.77, "ABC", UTCDate("2016-08-08"), UTCTime("04:15:00"),
- UTCTimestamp("1999-12-12 10:00:02")),
- row(3, 0.08, "BCD", UTCDate("2017-04-10"), UTCTime("02:30:00"),
- UTCTimestamp("1999-12-12 10:03:00")),
- row(4, 3.14, "CDE", UTCDate("2017-11-11"), UTCTime("02:30:00"),
- UTCTimestamp("2017-11-20 09:00:00")),
- row(4, 3.15, "DEF", UTCDate("2017-02-06"), UTCTime("06:00:00"),
- UTCTimestamp("2015-11-19 10:00:00")),
- row(4, 3.14, "EFG", UTCDate("2017-05-20"), UTCTime("09:45:78"),
- UTCTimestamp("2015-11-19 10:00:01")),
- row(4, 3.16, "FGH", UTCDate("2017-05-19"), UTCTime("11:11:11"),
- UTCTimestamp("2015-11-20 08:59:59")),
- row(5, -5.9, "GHI", UTCDate("2017-07-20"), UTCTime("22:22:22"),
- UTCTimestamp("1989-06-04 10:00:00.78")),
- row(5, 2.71, "HIJ", UTCDate("2017-09-08"), UTCTime("20:09:09"),
- UTCTimestamp("1997-07-01 09:00:00.99")),
- row(5, 3.9, "IJK", UTCDate("2017-02-02"), UTCTime("03:03:03"),
- UTCTimestamp("2000-01-01 00:00:00.09")),
- row(5, 0.7, "JKL", UTCDate("2017-10-01"), UTCTime("19:00:00"),
- UTCTimestamp("2010-06-01 10:00:00.999")),
- row(5, -2.8, "KLM", UTCDate("2017-07-01"), UTCTime("12:00:59"),
- UTCTimestamp("1937-07-07 08:08:08.888"))
+ row(1, 1.1, "a", localDate("2017-04-08"), localTime("12:00:59"),
+ localDateTime("2015-05-20 10:00:00")),
+ row(2, 2.5, "abc", localDate("2017-04-09"), localTime("12:00:59"),
+ localDateTime("2019-09-19 08:03:09")),
+ row(2, -2.4, "abcd", localDate("2017-04-08"), localTime("00:00:00"),
+ localDateTime("2016-09-01 23:07:06")),
+ row(3, 0.0, "abc?", localDate("2017-10-11"), localTime("23:59:59"),
+ localDateTime("1999-12-12 10:00:00")),
+ row(3, -9.77, "ABC", localDate("2016-08-08"), localTime("04:15:00"),
+ localDateTime("1999-12-12 10:00:02")),
+ row(3, 0.08, "BCD", localDate("2017-04-10"), localTime("02:30:00"),
+ localDateTime("1999-12-12 10:03:00")),
+ row(4, 3.14, "CDE", localDate("2017-11-11"), localTime("02:30:00"),
+ localDateTime("2017-11-20 09:00:00")),
+ row(4, 3.15, "DEF", localDate("2017-02-06"), localTime("06:00:00"),
+ localDateTime("2015-11-19 10:00:00")),
+ row(4, 3.14, "EFG", localDate("2017-05-20"), localTime("09:45:78"),
+ localDateTime("2015-11-19 10:00:01")),
+ row(4, 3.16, "FGH", localDate("2017-05-19"), localTime("11:11:11"),
+ localDateTime("2015-11-20 08:59:59")),
+ row(5, -5.9, "GHI", localDate("2017-07-20"), localTime("22:22:22"),
+ localDateTime("1989-06-04 10:00:00.78")),
+ row(5, 2.71, "HIJ", localDate("2017-09-08"), localTime("20:09:09"),
+ localDateTime("1997-07-01 09:00:00.99")),
+ row(5, 3.9, "IJK", localDate("2017-02-02"), localTime("03:03:03"),
+ localDateTime("2000-01-01 00:00:00.09")),
+ row(5, 0.7, "JKL", localDate("2017-10-01"), localTime("19:00:00"),
+ localDateTime("2010-06-01 10:00:00.999")),
+ row(5, -2.8, "KLM", localDate("2017-07-01"), localTime("12:00:59"),
+ localDateTime("1937-07-07 08:08:08.888"))
)
val nullablesOfData6 = Array(false, false, false, false, false, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
index 3aa6d50..87daa88 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -18,22 +18,24 @@
package org.apache.flink.table.runtime.utils
-import java.lang.{Iterable => JIterable}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.{Tuple1, Tuple2}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, PojoField, PojoTypeInfo, RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.dataformat.{BaseRow, BinaryString}
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction}
+import org.apache.flink.types.Row
+
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{ListTypeInfo, PojoField, PojoTypeInfo, RowTypeInfo}
-import org.apache.flink.api.scala.typeutils.Types
-import org.apache.flink.table.dataformat.{BaseRow, BinaryString}
-import org.apache.flink.types.Row
import java.io.File
-import java.sql.Timestamp
+import java.lang.{Iterable => JIterable}
+import java.sql.{Date, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
@@ -200,10 +202,27 @@ object UserDefinedFunctionTestUtils {
}
object DateFunction extends ScalarFunction {
- def eval(d: Integer): Integer = d
+ def eval(d: Date): String = d.toString
+ }
+
+ object LocalDateFunction extends ScalarFunction {
+ def eval(d: LocalDate): String = d.toString
+ }
+
+ object TimestampFunction extends ScalarFunction {
+ def eval(t: java.sql.Timestamp): String = t.toString
+ }
+
+ object DateTimeFunction extends ScalarFunction {
+ def eval(t: LocalDateTime): String = t.toString
+ }
+
+ object TimeFunction extends ScalarFunction {
+ def eval(t: java.sql.Time): String = t.toString
+ }
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
- SqlTimeTypeInfo.DATE
+ object LocalTimeFunction extends ScalarFunction {
+ def eval(t: LocalTime): String = t.toString
}
// Understand type: Row wrapped as TypeInfoWrappedDataType.
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/DateTimeTestUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/DateTimeTestUtil.scala
index 05b3f0e..f0b664b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/DateTimeTestUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/DateTimeTestUtil.scala
@@ -18,21 +18,24 @@
package org.apache.flink.table.util
+import org.apache.flink.table.dataformat.DataFormatConverters.{LocalDateConverter, LocalDateTimeConverter, LocalTimeConverter}
+
import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate
-import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
object DateTimeTestUtil {
- def UTCDate(s: String): Date = {
- new Date(DateTimeUtils.dateStringToUnixDate(s) * DateTimeUtils.MILLIS_PER_DAY)
+ def localDate(s: String): LocalDate = {
+ LocalDateConverter.INSTANCE.toExternal(dateStringToUnixDate(s))
}
- def UTCTime(s: String): Time = {
- new Time(DateTimeUtils.timeStringToUnixDate(s).longValue())
+ def localTime(s: String): LocalTime = {
+ LocalTimeConverter.INSTANCE.toExternal(DateTimeUtils.timeStringToUnixDate(s))
}
- def UTCTimestamp(s: String): Timestamp = {
- new Timestamp(DateTimeUtils.timestampStringToUnixDate(s))
+ def localDateTime(s: String): LocalDateTime = {
+ LocalDateTimeConverter.INSTANCE.toExternal(DateTimeUtils.timestampStringToUnixDate(s))
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index e160b8b..1309be4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -21,21 +21,17 @@ package org.apache.flink.table.api.validation
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
-import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, Types, ValidationException}
+import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
-import org.apache.flink.table.types.DataType
import org.apache.flink.table.utils.{TableTestBase, TestTableSourceWithTime}
import org.apache.flink.types.Row
import org.junit.Test
-import java.time.LocalDateTime
import java.util
import java.util.Collections
@@ -275,36 +271,6 @@ class TableSourceValidationTest extends TableTestBase{
tEnv.registerTableSource("testTable", ts)
}
- @Test
- def testLocalDateTimeInTableField(): Unit = {
- expectedException.expect(classOf[TableException])
- expectedException.expectMessage("Unsupported conversion from data type 'TIMESTAMP(3)'" +
- " (conversion class: java.time.LocalDateTime) to type information")
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
-
- tEnv.registerTableSource("table", new StreamTableSource[Row] {
-
- override def getDataStream(execEnv: JExecEnv): DataStream[Row] = env.getJavaEnv
- .fromElements(Row.of(LocalDateTime.of(1, 1, 1, 1, 1)))
-
- override def getTableSchema: TableSchema = TableSchema.builder()
- .field("f0", DataTypes.TIMESTAMP(3).bridgedTo(classOf[LocalDateTime]))
- .build()
-
- override def getProducedDataType: DataType = DataTypes
- .ROW(DataTypes
- .FIELD(
- "f0",
- DataTypes.TIMESTAMP(3).bridgedTo(classOf[LocalDateTime])))
- })
-
- val table = tEnv.scan("table").select("f0")
-
- tEnv.explain(table)
- // expression thrown, legacy planner does not support LocalDateTime
- }
-
// CsvTableSource Tests
@Test(expected = classOf[IllegalArgumentException])
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index af32a51..c907f42 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -53,6 +53,9 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -103,14 +106,17 @@ public class DataFormatConverters {
t2C.put(DataTypes.TINYINT().bridgedTo(byte.class), ByteConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(Date.class), DateConverter.INSTANCE);
+ t2C.put(DataTypes.DATE().bridgedTo(LocalDate.class), LocalDateConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(Integer.class), IntConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(int.class), IntConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(Time.class), TimeConverter.INSTANCE);
+ t2C.put(DataTypes.TIME().bridgedTo(LocalTime.class), LocalTimeConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(Integer.class), IntConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(int.class), IntConverter.INSTANCE);
t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class), TimestampConverter.INSTANCE);
+ t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class), LocalDateTimeConverter.INSTANCE);
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class), IntConverter.INSTANCE);
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(int.class), IntConverter.INSTANCE);
@@ -128,7 +134,7 @@ public class DataFormatConverters {
* lost its specific Java format. Only DataType retains all its
* Java format information.
*/
- @SuppressWarnings("unchecked")
+ @Deprecated
public static DataFormatConverter getConverterForDataType(DataType originDataType) {
DataType dataType = originDataType.nullable();
DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);
@@ -626,6 +632,87 @@ public class DataFormatConverters {
}
/**
+ * Converter for LocalDate.
+ */
+ public static final class LocalDateConverter extends DataFormatConverter<Integer, LocalDate> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalDateConverter INSTANCE = new LocalDateConverter();
+
+ private LocalDateConverter() {}
+
+ @Override
+ Integer toInternalImpl(LocalDate value) {
+ return SqlDateTimeUtils.localDateToUnixDate(value);
+ }
+
+ @Override
+ LocalDate toExternalImpl(Integer value) {
+ return SqlDateTimeUtils.unixDateToLocalDate(value);
+ }
+
+ @Override
+ LocalDate toExternalImpl(BaseRow row, int column) {
+ return toExternalImpl(row.getInt(column));
+ }
+ }
+
+ /**
+ * Converter for LocalTime.
+ */
+ public static final class LocalTimeConverter extends DataFormatConverter<Integer, LocalTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalTimeConverter INSTANCE = new LocalTimeConverter();
+
+ private LocalTimeConverter() {}
+
+ @Override
+ Integer toInternalImpl(LocalTime value) {
+ return SqlDateTimeUtils.localTimeToUnixDate(value);
+ }
+
+ @Override
+ LocalTime toExternalImpl(Integer value) {
+ return SqlDateTimeUtils.unixTimeToLocalTime(value);
+ }
+
+ @Override
+ LocalTime toExternalImpl(BaseRow row, int column) {
+ return toExternalImpl(row.getInt(column));
+ }
+ }
+
+ /**
+ * Converter for LocalDateTime.
+ */
+ public static final class LocalDateTimeConverter extends DataFormatConverter<Long, LocalDateTime> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final LocalDateTimeConverter INSTANCE = new LocalDateTimeConverter();
+
+ private LocalDateTimeConverter() {}
+
+ @Override
+ Long toInternalImpl(LocalDateTime value) {
+ return SqlDateTimeUtils.localDateTimeToUnixTimestamp(value);
+ }
+
+ @Override
+ LocalDateTime toExternalImpl(Long value) {
+ return SqlDateTimeUtils.unixTimestampToLocalDateTime(value);
+ }
+
+ @Override
+ LocalDateTime toExternalImpl(BaseRow row, int column) {
+ return toExternalImpl(row.getLong(column));
+ }
+ }
+
+ /**
* Converter for date.
*/
public static final class DateConverter extends DataFormatConverter<Integer, Date> {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index 9921aac..bf46c2f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -35,6 +35,8 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
@@ -54,6 +56,16 @@ public class SqlDateTimeUtils {
private static final int EPOCH_JULIAN = 2440588;
/**
+ * The number of milliseconds in a second.
+ */
+ private static final long MILLIS_PER_SECOND = 1000L;
+
+ /**
+ * The number of milliseconds in a minute.
+ */
+ private static final long MILLIS_PER_MINUTE = 60000L;
+
+ /**
* The number of milliseconds in an hour.
*/
private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
@@ -120,50 +132,22 @@ public class SqlDateTimeUtils {
/** Converts the internal representation of a SQL DATE (int) to the Java
* type used for UDF parameters ({@link java.sql.Date}). */
public static java.sql.Date internalToDate(int v) {
- return internalToDate(v, UTC_ZONE);
- }
-
- /**
- * Converts the internal representation of a SQL DATE (int) to the Java
- * type used for UDF parameters ({@link java.sql.Date}) with the given TimeZone.
- *
- * <p>The internal int represents the days since January 1, 1970. When we convert it
- * to {@link java.sql.Date} (time milliseconds since January 1, 1970, 00:00:00 GMT),
- * we need a TimeZone.
- */
- public static java.sql.Date internalToDate(int v, TimeZone tz) {
// note that, in this case, can't handle Daylight Saving Time
final long t = v * MILLIS_PER_DAY;
- return new java.sql.Date(t - tz.getOffset(t));
+ return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
}
/** Converts the internal representation of a SQL TIME (int) to the Java
* type used for UDF parameters ({@link java.sql.Time}). */
public static java.sql.Time internalToTime(int v) {
- return internalToTime(v, UTC_ZONE);
- }
-
- /**
- * Converts the internal representation of a SQL TIME (int) to the Java
- * type used for UDF parameters ({@link java.sql.Time}).
- *
- * <p>The internal int represents the seconds since "00:00:00". When we convert it to
- * {@link java.sql.Time} (time milliseconds since January 1, 1970, 00:00:00 GMT),
- * we need a TimeZone.
- */
- public static java.sql.Time internalToTime(int v, TimeZone tz) {
// note that, in this case, can't handle Daylight Saving Time
- return new java.sql.Time(v - tz.getOffset(v));
+ return new java.sql.Time(v - LOCAL_TZ.getOffset(v));
}
/** Converts the internal representation of a SQL TIMESTAMP (long) to the Java
- * type used for UDF parameters ({@link java.sql.Timestamp}).
- *
- * <p>The internal long represents the time milliseconds since January 1, 1970, 00:00:00 GMT.
- * So we don't need to take TimeZone into account.
- */
+ * type used for UDF parameters ({@link java.sql.Timestamp}). */
public static java.sql.Timestamp internalToTimestamp(long v) {
- return new java.sql.Timestamp(v);
+ return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
}
/** Converts the Java type used for UDF parameters of SQL DATE type
@@ -171,15 +155,7 @@ public class SqlDateTimeUtils {
*
* <p>Converse of {@link #internalToDate(int)}. */
public static int dateToInternal(java.sql.Date date) {
- return dateToInternal(date, UTC_ZONE);
- }
-
- /** Converts the Java type used for UDF parameters of SQL DATE type
- * ({@link java.sql.Date}) to internal representation (int).
- *
- * <p>Converse of {@link #internalToDate(int)}. */
- public static int dateToInternal(java.sql.Date date, TimeZone tz) {
- long ts = date.getTime() + tz.getOffset(date.getTime());
+ long ts = date.getTime() + LOCAL_TZ.getOffset(date.getTime());
return (int) (ts / MILLIS_PER_DAY);
}
@@ -188,15 +164,7 @@ public class SqlDateTimeUtils {
*
* <p>Converse of {@link #internalToTime(int)}. */
public static int timeToInternal(java.sql.Time time) {
- return timeToInternal(time, UTC_ZONE);
- }
-
- /** Converts the Java type used for UDF parameters of SQL TIME type
- * ({@link java.sql.Time}) to internal representation (int).
- *
- * <p>Converse of {@link #internalToTime(int)}. */
- public static int timeToInternal(java.sql.Time time, TimeZone tz) {
- long ts = time.getTime() + tz.getOffset(time.getTime());
+ long ts = time.getTime() + LOCAL_TZ.getOffset(time.getTime());
return (int) (ts % MILLIS_PER_DAY);
}
@@ -205,7 +173,8 @@ public class SqlDateTimeUtils {
*
* <p>Converse of {@link #internalToTimestamp(long)}. */
public static long timestampToInternal(java.sql.Timestamp ts) {
- return ts.getTime();
+ long time = ts.getTime();
+ return time + LOCAL_TZ.getOffset(time);
}
@@ -238,6 +207,10 @@ public class SqlDateTimeUtils {
// String --> Timestamp conversion
// --------------------------------------------------------------------------------------------
+ public static Long toTimestamp(String dateStr) {
+ return toTimestamp(dateStr, UTC_ZONE);
+ }
+
/**
* Parse date time string to timestamp based on the given time zone and
* "yyyy-MM-dd HH:mm:ss" format. Returns null if parsing failed.
@@ -279,6 +252,10 @@ public class SqlDateTimeUtils {
}
}
+ public static Long toTimestamp(String dateStr, String format) {
+ return toTimestamp(dateStr, format, UTC_ZONE);
+ }
+
/**
* Parse date time string to timestamp based on the given time zone string and format.
* Returns null if parsing failed.
@@ -357,6 +334,18 @@ public class SqlDateTimeUtils {
return dateFormat(dateStr, TIMESTAMP_FORMAT_STRING, toFormat, tz);
}
+ public static String dateFormat(long ts, String format) {
+ return dateFormat(ts, format, UTC_ZONE);
+ }
+
+ public static String dateFormat(String dateStr, String fromFormat, String toFormat) {
+ return dateFormat(dateStr, fromFormat, toFormat, UTC_ZONE);
+ }
+
+ public static String dateFormat(String dateStr, String toFormat) {
+ return dateFormat(dateStr, toFormat, UTC_ZONE);
+ }
+
public static String dateFormatTz(long ts, String format, String tzStr) {
TimeZone tz = TIMEZONE_CACHE.get(tzStr);
return dateFormat(ts, format, tz);
@@ -383,6 +372,12 @@ public class SqlDateTimeUtils {
return convertTz(dateStr, TIMESTAMP_FORMAT_STRING, tzFrom, tzTo);
}
+ public static String timestampToString(long ts, int precision) {
+ int p = (precision <= 3 && precision >= 0) ? precision : 3;
+ String format = DEFAULT_DATETIME_FORMATS[p];
+ return dateFormat(ts, format, UTC_ZONE);
+ }
+
/**
* Convert a timestamp to string.
* @param ts the timestamp to convert.
@@ -602,6 +597,10 @@ public class SqlDateTimeUtils {
// Floor/Ceil
// --------------------------------------------------------------------------------------------
+ public static long timestampFloor(TimeUnitRange range, long ts) {
+ return timestampFloor(range, ts, UTC_ZONE);
+ }
+
public static long timestampFloor(TimeUnitRange range, long ts, TimeZone tz) {
// assume that we are at UTC timezone, just for algorithm performance
long offset = tz.getOffset(ts);
@@ -624,6 +623,10 @@ public class SqlDateTimeUtils {
}
}
+ public static long timestampCeil(TimeUnitRange range, long ts) {
+ return timestampCeil(range, ts, UTC_ZONE);
+ }
+
/**
* Keep the algorithm consistent with Calcite DateTimeUtils.julianDateFloor, but here
* we take time zone into account.
@@ -748,6 +751,22 @@ public class SqlDateTimeUtils {
return dateDiff(t1, t2, tz);
}
+ public static int dateDiff(long t1, long t2) {
+ return dateDiff(t1, t2, UTC_ZONE);
+ }
+
+ public static int dateDiff(String t1Str, long t2) {
+ return dateDiff(t1Str, t2, UTC_ZONE);
+ }
+
+ public static int dateDiff(long t1, String t2Str) {
+ return dateDiff(t1, t2Str, UTC_ZONE);
+ }
+
+ public static int dateDiff(String t1Str, String t2Str) {
+ return dateDiff(t1Str, t2Str, UTC_ZONE);
+ }
+
/**
* Do subtraction on date string.
*
@@ -780,6 +799,14 @@ public class SqlDateTimeUtils {
return dateFormat(resultTs, DATE_FORMAT_STRING, tz);
}
+ public static String dateSub(String dateStr, int days) {
+ return dateSub(dateStr, days, UTC_ZONE);
+ }
+
+ public static String dateSub(long ts, int days) {
+ return dateSub(ts, days, UTC_ZONE);
+ }
+
/**
* Do addition on date string.
*
@@ -810,6 +837,14 @@ public class SqlDateTimeUtils {
return dateFormat(resultTs, DATE_FORMAT_STRING, tz);
}
+ public static String dateAdd(String dateStr, int days) {
+ return dateAdd(dateStr, days, UTC_ZONE);
+ }
+
+ public static String dateAdd(long ts, int days) {
+ return dateAdd(ts, days, UTC_ZONE);
+ }
+
// --------------------------------------------------------------------------------------------
// UNIX TIME
// --------------------------------------------------------------------------------------------
@@ -870,6 +905,23 @@ public class SqlDateTimeUtils {
return fromUnixtime(Decimal.castToLong(unixtime), tz);
}
+ public static String fromUnixtime(long unixtime) {
+ return fromUnixtime(unixtime, UTC_ZONE);
+
+ }
+
+ public static String fromUnixtime(long unixtime, String format) {
+ return fromUnixtime(unixtime, format, UTC_ZONE);
+ }
+
+ public static String fromUnixtime(double unixtime) {
+ return fromUnixtime(unixtime, UTC_ZONE);
+ }
+
+ public static String fromUnixtime(Decimal unixtime) {
+ return fromUnixtime(unixtime, UTC_ZONE);
+ }
+
/**
* Returns a Unix timestamp in seconds since '1970-01-01 00:00:00' UTC as an unsigned
* integer.
@@ -900,10 +952,115 @@ public class SqlDateTimeUtils {
}
}
+ public static long unixTimestamp(String dateStr) {
+ return unixTimestamp(dateStr, UTC_ZONE);
+ }
+
+ public static long unixTimestamp(String dateStr, String format) {
+ return unixTimestamp(dateStr, format, UTC_ZONE);
+ }
+
/**
* Returns the value of the timestamp to seconds since '1970-01-01 00:00:00' UTC.
*/
public static long unixTimestamp(long ts) {
return ts / 1000;
}
+
+ public static LocalDate unixDateToLocalDate(int date) {
+ return julianToLocalDate(date + EPOCH_JULIAN);
+ }
+
+ private static LocalDate julianToLocalDate(int julian) {
+ // this shifts the epoch back to astronomical year -4800 instead of the
+ // start of the Christian era in year AD 1 of the proleptic Gregorian
+ // calendar.
+ int j = julian + 32044;
+ int g = j / 146097;
+ int dg = j % 146097;
+ int c = (dg / 36524 + 1) * 3 / 4;
+ int dc = dg - c * 36524;
+ int b = dc / 1461;
+ int db = dc % 1461;
+ int a = (db / 365 + 1) * 3 / 4;
+ int da = db - a * 365;
+
+ // integer number of full years elapsed since March 1, 4801 BC
+ int y = g * 400 + c * 100 + b * 4 + a;
+ // integer number of full months elapsed since the last March 1
+ int m = (da * 5 + 308) / 153 - 2;
+ // number of days elapsed since day 1 of the month
+ int d = da - (m + 4) * 153 / 5 + 122;
+ int year = y - 4800 + (m + 2) / 12;
+ int month = (m + 2) % 12 + 1;
+ int day = d + 1;
+ return LocalDate.of(year, month, day);
+ }
+
+ public static int localDateToUnixDate(LocalDate date) {
+ return ymdToUnixDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth());
+ }
+
+ private static int ymdToUnixDate(int year, int month, int day) {
+ final int julian = ymdToJulian(year, month, day);
+ return julian - EPOCH_JULIAN;
+ }
+
+ private static int ymdToJulian(int year, int month, int day) {
+ int a = (14 - month) / 12;
+ int y = year + 4800 - a;
+ int m = month + 12 * a - 3;
+ return day + (153 * m + 2) / 5
+ + 365 * y
+ + y / 4
+ - y / 100
+ + y / 400
+ - 32045;
+ }
+
+ public static LocalTime unixTimeToLocalTime(int time) {
+ int h = time / 3600000;
+ int time2 = time % 3600000;
+ int m = time2 / 60000;
+ int time3 = time2 % 60000;
+ int s = time3 / 1000;
+ int ms = time3 % 1000;
+ return LocalTime.of(h, m, s, ms * 1000_000);
+ }
+
+ public static int localTimeToUnixDate(LocalTime time) {
+ return time.getHour() * (int) MILLIS_PER_HOUR
+ + time.getMinute() * (int) MILLIS_PER_MINUTE
+ + time.getSecond() * (int) MILLIS_PER_SECOND
+ + time.getNano() / 1000_000;
+ }
+
+ public static LocalDateTime unixTimestampToLocalDateTime(long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ int time = (int) (timestamp % MILLIS_PER_DAY);
+ if (time < 0) {
+ --date;
+ time += MILLIS_PER_DAY;
+ }
+ LocalDate localDate = unixDateToLocalDate(date);
+ LocalTime localTime = unixTimeToLocalTime(time);
+ return LocalDateTime.of(localDate, localTime);
+ }
+
+ public static long localDateTimeToUnixTimestamp(LocalDateTime dateTime) {
+ return unixTimestamp(
+ dateTime.getYear(), dateTime.getMonthValue(), dateTime.getDayOfMonth(),
+ dateTime.getHour(), dateTime.getMinute(), dateTime.getSecond(),
+ dateTime.getNano() / 1000_000);
+ }
+
+ private static long unixTimestamp(int year, int month, int day, int hour,
+ int minute, int second, int mills) {
+ final int date = ymdToUnixDate(year, month, day);
+ return (long) date * MILLIS_PER_DAY
+ + (long) hour * MILLIS_PER_HOUR
+ + (long) minute * MILLIS_PER_MINUTE
+ + (long) second * MILLIS_PER_SECOND
+ + mills;
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/window/WindowOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/window/WindowOperatorBuilder.java
index fd4009a..c381386 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/window/WindowOperatorBuilder.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/window/WindowOperatorBuilder.java
@@ -80,17 +80,15 @@ public class WindowOperatorBuilder {
return this;
}
- public WindowOperatorBuilder tumble(Duration size, long offset) {
+ public WindowOperatorBuilder tumble(Duration size) {
checkArgument(windowAssigner == null);
- this.windowAssigner = TumblingWindowAssigner.of(size)
- .withOffset(Duration.ofMillis(-offset));
+ this.windowAssigner = TumblingWindowAssigner.of(size);
return this;
}
- public WindowOperatorBuilder sliding(Duration size, Duration slide, long offset) {
+ public WindowOperatorBuilder sliding(Duration size, Duration slide) {
checkArgument(windowAssigner == null);
- this.windowAssigner = SlidingWindowAssigner.of(size, slide)
- .withOffset(Duration.ofMillis(-offset));
+ this.windowAssigner = SlidingWindowAssigner.of(size, slide);
return this;
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
index a34d434..6d7937e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
@@ -18,116 +18,23 @@
package org.apache.flink.table.types;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.dataformat.BaseArray;
import org.apache.flink.table.dataformat.BaseMap;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TypeInformationAnyType;
import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.util.Map;
-
-import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
/**
* Get internal(sql engine execution data formats) and default external class for {@link LogicalType}.
*/
public class ClassLogicalTypeConverter {
- /**
- * Get default external class for {@link LogicalType}.
- * TODO change TimestampType default conversion class to {@link LocalDateTime} from {@link Timestamp}.
- * TODO relace it with getting class from {@link TypeConversions#fromLogicalToDataType}.
- */
@Deprecated
public static Class getDefaultExternalClassForType(LogicalType type) {
- switch (type.getTypeRoot()) {
- case BOOLEAN:
- return Boolean.class;
- case TINYINT:
- return Byte.class;
- case SMALLINT:
- return Short.class;
- case INTEGER:
- return Integer.class;
- case DATE:
- return Date.class;
- case TIME_WITHOUT_TIME_ZONE:
- return Time.class;
- case INTERVAL_YEAR_MONTH:
- return Integer.class;
- case BIGINT:
- return Long.class;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return Timestamp.class;
- case INTERVAL_DAY_TIME:
- return Long.class;
- case FLOAT:
- return Float.class;
- case DOUBLE:
- return Double.class;
- case CHAR:
- case VARCHAR:
- return String.class;
- case DECIMAL:
- return BigDecimal.class;
- case ARRAY:
- if (type instanceof LegacyTypeInformationType) {
- return ((LegacyTypeInformationType) type).getTypeInformation().getTypeClass();
- }
- ArrayType arrayType = (ArrayType) type;
- LogicalType elementType = arrayType.getElementType();
- if (elementType.isNullable() || !isPrimitive(elementType)) {
- return Array.newInstance(getDefaultExternalClassForType(elementType), 0).getClass();
- } else {
- switch (arrayType.getElementType().getTypeRoot()) {
- case BOOLEAN:
- return boolean[].class;
- case TINYINT:
- return byte[].class;
- case SMALLINT:
- return short[].class;
- case INTEGER:
- return int[].class;
- case BIGINT:
- return long[].class;
- case FLOAT:
- return float[].class;
- case DOUBLE:
- return double[].class;
- default:
- throw new RuntimeException("Not support type: " + type);
- }
- }
- case MAP:
- case MULTISET:
- return Map.class;
- case ROW:
- return Row.class;
- case BINARY:
- case VARBINARY:
- return byte[].class;
- case ANY:
- TypeInformation typeInfo = type instanceof LegacyTypeInformationType ?
- ((LegacyTypeInformationType) type).getTypeInformation() :
- ((TypeInformationAnyType) type).getTypeInformation();
- return typeInfo.getTypeClass();
- default:
- throw new RuntimeException("Not support type: " + type);
- }
+ return TypeConversions.fromLogicalToDataType(type).getConversionClass();
}
/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/LogicalTypeDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/LogicalTypeDataTypeConverter.java
index 88def7d..a61c420 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/LogicalTypeDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/LogicalTypeDataTypeConverter.java
@@ -38,10 +38,6 @@ import org.apache.flink.table.typeutils.BigDecimalTypeInfo;
import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -50,7 +46,6 @@ import static org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromType
/**
* Converter between {@link DataType} and {@link LogicalType}.
- * TODO change TimestampType default conversion class to {@link LocalDateTime} from {@link Timestamp}.
*
* <p>This class is for:
* 1.Source, Sink.
@@ -60,34 +55,8 @@ import static org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromType
@Deprecated
public class LogicalTypeDataTypeConverter {
- /**
- * DATE, TIME, TIMESTAMP use {@link Timestamp} instead of {@link LocalDateTime}.
- */
public static DataType fromLogicalTypeToDataType(LogicalType logicalType) {
- DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
- switch (logicalType.getTypeRoot()) {
- case DATE:
- return dataType.bridgedTo(Date.class);
- case TIME_WITHOUT_TIME_ZONE:
- return dataType.bridgedTo(Time.class);
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return dataType.bridgedTo(Timestamp.class);
- case ARRAY:
- return DataTypes.ARRAY(fromLogicalTypeToDataType(logicalType.getChildren().get(0)));
- case MAP:
- return DataTypes.MAP(
- fromLogicalTypeToDataType(logicalType.getChildren().get(0)),
- fromLogicalTypeToDataType(logicalType.getChildren().get(1)));
- case MULTISET:
- return DataTypes.MULTISET(fromLogicalTypeToDataType(logicalType.getChildren().get(0)));
- case ROW:
- RowType rowType = (RowType) logicalType;
- return DataTypes.ROW(rowType.getFields().stream()
- .map(rowField -> DataTypes.FIELD(rowField.getName(), fromLogicalTypeToDataType(rowField.getType())))
- .toArray(DataTypes.Field[]::new));
- default:
- return dataType;
- }
+ return TypeConversions.fromLogicalToDataType(logicalType);
}
/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
index 2996aae..10987ff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
@@ -157,11 +157,11 @@ public class BaseRowTypeInfo extends TupleTypeInfoBase<BaseRow> {
@Override
public String toString() {
StringBuilder bld = new StringBuilder("BaseRow");
- if (types.length > 0) {
- bld.append('(').append(fieldNames[0]).append(": ").append(types[0]);
+ if (logicalTypes.length > 0) {
+ bld.append('(').append(fieldNames[0]).append(": ").append(logicalTypes[0]);
- for (int i = 1; i < types.length; i++) {
- bld.append(", ").append(fieldNames[i]).append(": ").append(types[i]);
+ for (int i = 1; i < logicalTypes.length; i++) {
+ bld.append(", ").append(fieldNames[i]).append(": ").append(logicalTypes[i]);
}
bld.append(')');
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
index 8e675d8..684694a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.dataformat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -44,8 +44,6 @@ import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
@@ -76,9 +74,9 @@ public class DataFormatConvertersTest {
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO,
- SqlTimeTypeInfo.DATE,
- SqlTimeTypeInfo.TIME,
- SqlTimeTypeInfo.TIMESTAMP,
+ LocalTimeTypeInfo.LOCAL_DATE,
+ LocalTimeTypeInfo.LOCAL_TIME,
+ LocalTimeTypeInfo.LOCAL_DATE_TIME,
BinaryStringTypeInfo.INSTANCE
};
@@ -103,9 +101,9 @@ public class DataFormatConvertersTest {
new byte[] {5, 1},
new char[] {5, 1},
- SqlDateTimeUtils.internalToDate(5),
- new Time(11),
- new Timestamp(11),
+ SqlDateTimeUtils.unixDateToLocalDate(5),
+ SqlDateTimeUtils.unixTimeToLocalTime(11),
+ SqlDateTimeUtils.unixTimestampToLocalDateTime(11),
BinaryString.fromString("hahah")
};
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java
index f2647d0..596b479 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java
@@ -101,7 +101,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .sliding(Duration.ofSeconds(3), Duration.ofSeconds(1), 0)
+ .sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withEventTime(2)
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.build();
@@ -188,7 +188,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .sliding(Duration.ofSeconds(3), Duration.ofSeconds(1), 0)
+ .sliding(Duration.ofSeconds(3), Duration.ofSeconds(1))
.withProcessingTime()
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.build();
@@ -251,7 +251,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(3), 0)
+ .tumble(Duration.ofSeconds(3))
.withEventTime(2)
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.build();
@@ -333,7 +333,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(3), 0)
+ .tumble(Duration.ofSeconds(3))
.withEventTime(2)
.triggering(
EventTimeTriggers
@@ -451,7 +451,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(3), 0)
+ .tumble(Duration.ofSeconds(3))
.withEventTime(2)
.triggering(
EventTimeTriggers
@@ -577,7 +577,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(3), 0)
+ .tumble(Duration.ofSeconds(3))
.withProcessingTime()
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.build();
@@ -809,7 +809,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(2), 0)
+ .tumble(Duration.ofSeconds(2))
.withEventTime(2)
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.withAllowedLateness(Duration.ofMillis(500))
@@ -864,7 +864,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofMillis(windowSize), 0)
+ .tumble(Duration.ofMillis(windowSize))
.withEventTime(2)
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.withAllowedLateness(Duration.ofMillis(lateness))
@@ -920,7 +920,7 @@ public class WindowOperatorTest {
WindowOperator operator = WindowOperatorBuilder
.builder()
.withInputFields(inputFieldTypes)
- .tumble(Duration.ofSeconds(windowSize), 0)
+ .tumble(Duration.ofSeconds(windowSize))
.withEventTime(2)
.aggregate(new SumAndCountAggTimeWindow(), equaliser, accTypes, aggResultTypes, windowTypes)
.withAllowedLateness(Duration.ofMillis(lateness))
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/grouping/HeapWindowsGroupingTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/grouping/HeapWindowsGroupingTest.java
index ee4515b..c7ca651 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/grouping/HeapWindowsGroupingTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/grouping/HeapWindowsGroupingTest.java
@@ -20,13 +20,11 @@ package org.apache.flink.table.runtime.window.grouping;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowWriter;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
import org.apache.flink.table.runtime.util.RowIterator;
import org.junit.Test;
import java.io.IOException;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -385,7 +383,7 @@ public class HeapWindowsGroupingTest {
if (assignedWindowStart.get(count) == null) {
writer.setNullAt(0);
} else {
- writer.writeLong(0, SqlDateTimeUtils.timestampToInternal(new Timestamp(assignedWindowStart.get(count))));
+ writer.writeLong(0, assignedWindowStart.get(count));
}
writer.complete();
count++;