You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/16 13:26:58 UTC
[5/5] flink git commit: [FLINK-8069] [table] Add preserving
WatermarkStrategy.
[FLINK-8069] [table] Add preserving WatermarkStrategy.
This closes #5016.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7f7d0c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7f7d0c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7f7d0c9
Branch: refs/heads/release-1.4
Commit: e7f7d0c9333f3f9db488c5f968a1627401485067
Parents: f14fcef
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 15 11:01:13 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 16 11:47:16 2017 +0100
----------------------------------------------------------------------
.../datastream/StreamTableSourceScan.scala | 5 +-
.../wmstrategies/watermarkStrategies.scala | 6 +++
.../stream/table/TableSourceITCase.scala | 51 ++++++++++++++++++++
.../flink/table/utils/testTableSources.scala | 28 ++++++++++-
4 files changed, 88 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5d305b4..9179d4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sources._
-import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner, PreserveWatermarks}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -134,6 +134,9 @@ class StreamTableSourceScan(
case p: PunctuatedWatermarkAssigner =>
val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ case _: PreserveWatermarks =>
+ // The watermarks have already been provided by the underlying DataStream.
+ ingestedTable
}
} else {
// No need to generate watermarks if no rowtime attribute is specified.
http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 0dd82f1..4c7f4e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
*/
def getWatermark(row: Row, timestamp: Long): Watermark
}
+
+/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
+class PreserveWatermarks extends WatermarkStrategy
+object PreserveWatermarks {
+ val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 77c1e08..c9ea30a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.utils._
import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
@@ -690,4 +692,53 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
"3,Mike,30000,true,3000")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testRowtimeTableSourcePreserveWatermarks(): Unit = {
+ StreamITCase.testResults = mutable.MutableList()
+ val tableName = "MyTable"
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ // rows with timestamps and watermarks
+ val data = Seq(
+ Right(1L),
+ Left(5L, Row.of(new JInt(1), new JLong(5), "A")),
+ Left(2L, Row.of(new JInt(2), new JLong(1), "B")),
+ Right(10L),
+ Left(8L, Row.of(new JInt(6), new JLong(8), "C")),
+ Right(20L),
+ Left(21L, Row.of(new JInt(6), new JLong(21), "D")),
+ Right(30L)
+ )
+
+ val fieldNames = Array("id", "rtime", "name")
+ val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
+ val rowType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestPreserveWMTableSource(schema, rowType, data, "rtime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ tEnv.scan(tableName)
+ .where('rtime.cast(Types.LONG) > 3L)
+ .select('id, 'name)
+ .toAppendStream[Row]
+ // append current watermark to each row to verify that original watermarks were preserved
+ .process(new ProcessFunction[Row, (Row, Long)] {
+ override def processElement(
+ value: Row,
+ ctx: ProcessFunction[Row, (Row, Long)]#Context,
+ out: Collector[(Row, Long)]): Unit = {
+ out.collect(value, ctx.timerService().currentWatermark())
+ }
+ })
+ .addSink(new StreamITCase.StringSink[(Row, Long)])
+ env.execute()
+
+ val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
index a546919..f7263c8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
@@ -27,9 +27,10 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -199,3 +200,28 @@ class TestNestedProjectableTableSource(
s"read nested fields: ${readNestedFields.mkString(", ")})"
}
}
+
+class TestPreserveWMTableSource[T](
+ tableSchema: TableSchema,
+ returnType: TypeInformation[T],
+ values: Seq[Either[(Long, T), Long]],
+ rowtime: String)
+ extends StreamTableSource[T]
+ with DefinedRowtimeAttributes {
+
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+ Collections.singletonList(new RowtimeAttributeDescriptor(
+ rowtime,
+ new ExistingField(rowtime),
+ PreserveWatermarks.INSTANCE))
+ }
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+ execEnv.addSource(new EventTimeSourceFunction[T](values)).setParallelism(1).returns(returnType)
+ }
+
+ override def getReturnType: TypeInformation[T] = returnType
+
+ override def getTableSchema: TableSchema = tableSchema
+
+}