You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/16 13:26:13 UTC

[4/5] flink git commit: [FLINK-8069] [table] Add preserving WatermarkStrategy.

[FLINK-8069] [table] Add preserving WatermarkStrategy.

This closes #5016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd1fbc07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd1fbc07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd1fbc07

Branch: refs/heads/master
Commit: cd1fbc0789d039e38c10d0a978137156aacbf186
Parents: c697bc1
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 15 11:01:13 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 16 11:32:12 2017 +0100

----------------------------------------------------------------------
 .../datastream/StreamTableSourceScan.scala      |  5 +-
 .../wmstrategies/watermarkStrategies.scala      |  6 +++
 .../stream/table/TableSourceITCase.scala        | 51 ++++++++++++++++++++
 .../flink/table/utils/testTableSources.scala    | 28 ++++++++++-
 4 files changed, 88 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5d305b4..9179d4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources._
-import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner, PreserveWatermarks}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -134,6 +134,9 @@ class StreamTableSourceScan(
         case p: PunctuatedWatermarkAssigner =>
           val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
           ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+        case _: PreserveWatermarks =>
+          // The watermarks have already been provided by the underlying DataStream.
+          ingestedTable
       }
     } else {
       // No need to generate watermarks if no rowtime attribute is specified.

http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 0dd82f1..4c7f4e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
     */
   def getWatermark(row: Row, timestamp: Long): Watermark
 }
+
+/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
+class PreserveWatermarks extends WatermarkStrategy
+object PreserveWatermarks {
+  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 77c1e08..c9ea30a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.utils._
 import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
@@ -690,4 +692,53 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       "3,Mike,30000,true,3000")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testRowtimeTableSourcePreserveWatermarks(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    // rows with timestamps and watermarks
+    val data = Seq(
+      Right(1L),
+      Left(5L, Row.of(new JInt(1), new JLong(5), "A")),
+      Left(2L, Row.of(new JInt(2), new JLong(1), "B")),
+      Right(10L),
+      Left(8L, Row.of(new JInt(6), new JLong(8), "C")),
+      Right(20L),
+      Left(21L, Row.of(new JInt(6), new JLong(21), "D")),
+      Right(30L)
+    )
+
+    val fieldNames = Array("id", "rtime", "name")
+    val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
+    val rowType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestPreserveWMTableSource(schema, rowType, data, "rtime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    tEnv.scan(tableName)
+      .where('rtime.cast(Types.LONG) > 3L)
+      .select('id, 'name)
+      .toAppendStream[Row]
+      // append current watermark to each row to verify that original watermarks were preserved
+      .process(new ProcessFunction[Row, (Row, Long)] {
+        override def processElement(
+            value: Row,
+            ctx: ProcessFunction[Row, (Row, Long)]#Context,
+            out: Collector[(Row, Long)]): Unit = {
+          out.collect(value, ctx.timerService().currentWatermark())
+        }
+      })
+      .addSink(new StreamITCase.StringSink[(Row, Long)])
+    env.execute()
+
+    val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
index a546919..f7263c8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
@@ -27,9 +27,10 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -199,3 +200,28 @@ class TestNestedProjectableTableSource(
       s"read nested fields: ${readNestedFields.mkString(", ")})"
   }
 }
+
+class TestPreserveWMTableSource[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[Either[(Long, T), Long]],
+    rowtime: String)
+  extends StreamTableSource[T]
+    with DefinedRowtimeAttributes {
+
+  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+    Collections.singletonList(new RowtimeAttributeDescriptor(
+      rowtime,
+      new ExistingField(rowtime),
+      PreserveWatermarks.INSTANCE))
+  }
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+    execEnv.addSource(new EventTimeSourceFunction[T](values)).setParallelism(1).returns(returnType)
+  }
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getTableSchema: TableSchema = tableSchema
+
+}