You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/19 06:49:27 UTC

[flink] branch release-1.6 updated: [FLINK-10169] [table] Fix error in RowtimeValidator when getting custom TimestampExtractor

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 50c0fb8  [FLINK-10169] [table] Fix error in RowtimeValidator when getting custom TimestampExtractor
50c0fb8 is described below

commit 50c0fb8d22cc8e9b58bfda1cb4381f7b3e34a901
Author: jrthe42 <jr...@gmail.com>
AuthorDate: Fri Aug 17 15:59:05 2018 +0800

    [FLINK-10169] [table] Fix error in RowtimeValidator when getting custom TimestampExtractor
    
    This closes #6575.
---
 .../flink/table/descriptors/RowtimeValidator.scala |  2 +-
 .../flink/table/descriptors/RowtimeTest.scala      | 59 ++++++++++++++++++++--
 .../table/descriptors/SchemaValidatorTest.scala    | 22 +++++++-
 3 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
index 160347e..4bd51c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -177,7 +177,7 @@ object RowtimeValidator {
 
       case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
         val clazz = properties.getClass(
-          ROWTIME_TIMESTAMPS_CLASS,
+          prefix + ROWTIME_TIMESTAMPS_CLASS,
           classOf[TimestampExtractor])
         DescriptorProperties.deserialize(
           properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index d5930fa..199d416 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -20,9 +20,12 @@ package org.apache.flink.table.descriptors
 
 import java.util
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, CustomExtractor}
+import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
 import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
@@ -57,7 +60,11 @@ class RowtimeTest extends DescriptorTestBase {
       .timestampsFromSource()
       .watermarksFromStrategy(new CustomAssigner())
 
-    util.Arrays.asList(desc1, desc2)
+    val desc3 = Rowtime()
+      .timestampsFromExtractor(new CustomExtractor("tsField"))
+      .watermarksPeriodicBounded(1000L)
+
+    util.Arrays.asList(desc1, desc2, desc3)
   }
 
   override def validator(): DescriptorValidator = {
@@ -83,7 +90,19 @@ class RowtimeTest extends DescriptorTestBase {
         "F0ZWd5mB_uSxDZ8-MCAAB4cA")
     )
 
-    util.Arrays.asList(props1.asJava, props2.asJava)
+    val props3 = Map(
+      "rowtime.timestamps.type" -> "custom",
+      "rowtime.timestamps.class" -> ("org.apache.flink.table.descriptors." +
+        "RowtimeTest$CustomExtractor"),
+      "rowtime.timestamps.serialized" -> ("rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3" +
+        "JzLlJvd3RpbWVUZXN0JEN1c3RvbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cm" +
+        "luZzt4cgA-b3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYW" +
+        "N0b3LU8E2thK4wMQIAAHhwdAAHdHNGaWVsZA"),
+      "rowtime.watermarks.type" -> "periodic-bounded",
+      "rowtime.watermarks.delay" -> "1000"
+    )
+
+    util.Arrays.asList(props1.asJava, props2.asJava, props3.asJava)
   }
 }
 
@@ -93,4 +112,36 @@ object RowtimeTest {
     override def getWatermark(row: Row, timestamp: Long): Watermark =
       throw new UnsupportedOperationException()
   }
+
+  class CustomExtractor(val field: String) extends TimestampExtractor {
+    def this() = {
+      this("ts")
+    }
+
+    override def getArgumentFields: Array[String] = Array(field)
+
+    override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+      argumentFieldTypes(0) match {
+        case Types.SQL_TIMESTAMP =>
+        case _ =>
+          throw ValidationException(
+            s"Field 'ts' must be of type Timestamp but is of type ${argumentFieldTypes(0)}.")
+      }
+    }
+
+    override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+      val fieldAccess: Expression = fieldAccesses(0)
+      require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
+      Cast(fieldAccess, Types.LONG)
+    }
+
+    override def equals(other: Any): Boolean = other match {
+      case that: CustomExtractor => field == that.field
+      case _ => false
+    }
+
+    override def hashCode(): Int = {
+      field.hashCode
+    }
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index a2eec4c..b5c664f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -21,8 +21,9 @@ package org.apache.flink.table.descriptors
 import java.util.Optional
 
 import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.descriptors.RowtimeTest.CustomExtractor
 import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp}
-import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
+import org.apache.flink.table.sources.wmstrategies.{BoundedOutOfOrderTimestamps, PreserveWatermarks}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
 
@@ -159,4 +160,23 @@ class SchemaValidatorTest {
       .build()
     assertEquals(expectedFormatSchema, formatSchema)
   }
+
+  @Test
+  def testSchemaWithRowtimeCustomTimestampExtractor(): Unit = {
+    val descriptor = Schema()
+      .field("f1", Types.STRING)
+      .field("f2", Types.STRING)
+      .field("f3", Types.SQL_TIMESTAMP)
+      .field("rt", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromExtractor(new CustomExtractor("f3"))
+          .watermarksPeriodicBounded(1000L))
+    val properties = new DescriptorProperties()
+    descriptor.addProperties(properties)
+
+    val rowtime = SchemaValidator.deriveRowtimeAttributes(properties).get(0)
+    assertEquals("rt", rowtime.getAttributeName)
+    val extractor = rowtime.getTimestampExtractor
+    assertTrue(extractor.equals(new CustomExtractor("f3")))
+    assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[BoundedOutOfOrderTimestamps])
+  }
 }