You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/14 13:19:08 UTC

[flink] branch release-1.6 updated: [FLINK-11304][docs][table] Fix typos in time attributes doc

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new c9c2fa4  [FLINK-11304][docs][table] Fix typos in time attributes doc
c9c2fa4 is described below

commit c9c2fa49039ca2d00ee0cf770185a2284b4be9fd
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jan 14 14:16:18 2019 +0100

    [FLINK-11304][docs][table] Fix typos in time attributes doc
---
 docs/dev/table/streaming.md | 39 +++++++++++++++++++++++++++------------
 1 file changed, 27 insertions(+), 12 deletions(-)

diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index d0fea47..30ea24d 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -412,27 +412,30 @@ val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'u
 
 #### Using a TableSource
 
-The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The `getRowtimeAttribute()` method returns the name of an existing field that carries the event time attribute of the table and is of type `LONG` or `TIMESTAMP`.
+The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttributes` interface. The `getRowtimeAttributeDescriptors()` method returns a list of `RowtimeAttributeDescriptor` for describing the final name of a time attribute, a timestamp extractor to derive the values of the attribute, and the watermark strategy associated with the attribute.
 
-Moreover, the `DataStream` returned by the `getDataStream()` method must have watermarks assigned that are aligned with the defined time attribute. Please note that the timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime attribute are relevant.
+Please make sure that the `DataStream` returned by the `getDataStream()` method is aligned with the defined time attribute.
+The timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are only considered if a `StreamRecordTimestamp` timestamp extractor is defined.
+Watermarks of a `DataStream` are only preserved if a `PreserveWatermarks` watermark strategy is defined.
+Otherwise, only the values of the `TableSource`'s rowtime attribute are relevant.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // define a table source with a rowtime attribute
-public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
+public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
 
 	@Override
 	public TypeInformation<Row> getReturnType() {
 		String[] names = new String[] {"Username", "Data", "UserActionTime"};
-		TypeInformation[] types = 
-		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
+		TypeInformation[] types =
+			new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
 		return Types.ROW(names, types);
 	}
 
 	@Override
 	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-		// create stream 
+		// create stream
 		// ...
 		// assign watermarks based on the "UserActionTime" attribute
 		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
@@ -440,9 +443,15 @@ public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeA
 	}
 
 	@Override
-	public String getRowtimeAttribute() {
+	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
 		// Mark the "UserActionTime" attribute as event-time attribute.
-		return "UserActionTime";
+		// We create one attribute descriptor of "UserActionTime".
+		RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
+			"UserActionTime",
+			new ExistingField("UserActionTime"),
+			new AscendingTimestamps());
+		List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
+		return listRowtimeAttrDescr;
 	}
 }
 
@@ -457,7 +466,7 @@ WindowedTable windowedTable = tEnv
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // define a table source with a rowtime attribute
-class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
 
 	override def getReturnType = {
 		val names = Array[String]("Username" , "Data", "UserActionTime")
@@ -466,16 +475,22 @@ class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribu
 	}
 
 	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-		// create stream 
+		// create stream
 		// ...
 		// assign watermarks based on the "UserActionTime" attribute
 		val stream = inputStream.assignTimestampsAndWatermarks(...)
 		stream
 	}
 
-	override def getRowtimeAttribute = {
+	override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
 		// Mark the "UserActionTime" attribute as event-time attribute.
-		"UserActionTime"
+		// We create one attribute descriptor of "UserActionTime".
+		val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
+			"UserActionTime",
+			new ExistingField("UserActionTime"),
+			new AscendingTimestamps)
+		val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
+		listRowtimeAttrDescr
 	}
 }