You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/19 06:49:27 UTC
[flink] branch release-1.6 updated: [FLINK-10169] [table] Fix error
in RowtimeValidator when getting custom TimestampExtractor
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 50c0fb8 [FLINK-10169] [table] Fix error in RowtimeValidator when getting custom TimestampExtractor
50c0fb8 is described below
commit 50c0fb8d22cc8e9b58bfda1cb4381f7b3e34a901
Author: jrthe42 <jr...@gmail.com>
AuthorDate: Fri Aug 17 15:59:05 2018 +0800
[FLINK-10169] [table] Fix error in RowtimeValidator when getting custom TimestampExtractor
This closes #6575.
---
.../flink/table/descriptors/RowtimeValidator.scala | 2 +-
.../flink/table/descriptors/RowtimeTest.scala | 59 ++++++++++++++++++++--
.../table/descriptors/SchemaValidatorTest.scala | 22 +++++++-
3 files changed, 77 insertions(+), 6 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
index 160347e..4bd51c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -177,7 +177,7 @@ object RowtimeValidator {
case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
val clazz = properties.getClass(
- ROWTIME_TIMESTAMPS_CLASS,
+ prefix + ROWTIME_TIMESTAMPS_CLASS,
classOf[TimestampExtractor])
DescriptorProperties.deserialize(
properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index d5930fa..199d416 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -20,9 +20,12 @@ package org.apache.flink.table.descriptors
import java.util
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, CustomExtractor}
+import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
import org.apache.flink.types.Row
import org.junit.Test
@@ -57,7 +60,11 @@ class RowtimeTest extends DescriptorTestBase {
.timestampsFromSource()
.watermarksFromStrategy(new CustomAssigner())
- util.Arrays.asList(desc1, desc2)
+ val desc3 = Rowtime()
+ .timestampsFromExtractor(new CustomExtractor("tsField"))
+ .watermarksPeriodicBounded(1000L)
+
+ util.Arrays.asList(desc1, desc2, desc3)
}
override def validator(): DescriptorValidator = {
@@ -83,7 +90,19 @@ class RowtimeTest extends DescriptorTestBase {
"F0ZWd5mB_uSxDZ8-MCAAB4cA")
)
- util.Arrays.asList(props1.asJava, props2.asJava)
+ val props3 = Map(
+ "rowtime.timestamps.type" -> "custom",
+ "rowtime.timestamps.class" -> ("org.apache.flink.table.descriptors." +
+ "RowtimeTest$CustomExtractor"),
+ "rowtime.timestamps.serialized" -> ("rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3" +
+ "JzLlJvd3RpbWVUZXN0JEN1c3RvbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cm" +
+ "luZzt4cgA-b3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYW" +
+ "N0b3LU8E2thK4wMQIAAHhwdAAHdHNGaWVsZA"),
+ "rowtime.watermarks.type" -> "periodic-bounded",
+ "rowtime.watermarks.delay" -> "1000"
+ )
+
+ util.Arrays.asList(props1.asJava, props2.asJava, props3.asJava)
}
}
@@ -93,4 +112,36 @@ object RowtimeTest {
override def getWatermark(row: Row, timestamp: Long): Watermark =
throw new UnsupportedOperationException()
}
+
+ class CustomExtractor(val field: String) extends TimestampExtractor {
+ def this() = {
+ this("ts")
+ }
+
+ override def getArgumentFields: Array[String] = Array(field)
+
+ override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+ argumentFieldTypes(0) match {
+ case Types.SQL_TIMESTAMP =>
+ case _ =>
+ throw ValidationException(
+ s"Field 'ts' must be of type Timestamp but is of type ${argumentFieldTypes(0)}.")
+ }
+ }
+
+ override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+ val fieldAccess: Expression = fieldAccesses(0)
+ require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
+ Cast(fieldAccess, Types.LONG)
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case that: CustomExtractor => field == that.field
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ field.hashCode
+ }
+ }
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index a2eec4c..b5c664f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -21,8 +21,9 @@ package org.apache.flink.table.descriptors
import java.util.Optional
import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.descriptors.RowtimeTest.CustomExtractor
import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp}
-import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
+import org.apache.flink.table.sources.wmstrategies.{BoundedOutOfOrderTimestamps, PreserveWatermarks}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
@@ -159,4 +160,23 @@ class SchemaValidatorTest {
.build()
assertEquals(expectedFormatSchema, formatSchema)
}
+
+ @Test
+ def testSchemaWithRowtimeCustomTimestampExtractor(): Unit = {
+ val descriptor = Schema()
+ .field("f1", Types.STRING)
+ .field("f2", Types.STRING)
+ .field("f3", Types.SQL_TIMESTAMP)
+ .field("rt", Types.SQL_TIMESTAMP).rowtime(
+ Rowtime().timestampsFromExtractor(new CustomExtractor("f3"))
+ .watermarksPeriodicBounded(1000L))
+ val properties = new DescriptorProperties()
+ descriptor.addProperties(properties)
+
+ val rowtime = SchemaValidator.deriveRowtimeAttributes(properties).get(0)
+ assertEquals("rt", rowtime.getAttributeName)
+ val extractor = rowtime.getTimestampExtractor
+ assertTrue(extractor.equals(new CustomExtractor("f3")))
+ assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[BoundedOutOfOrderTimestamps])
+ }
}