You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/14 13:19:08 UTC
[flink] branch release-1.6 updated: [FLINK-11304][docs][table] Fix
typos in time attributes doc
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new c9c2fa4 [FLINK-11304][docs][table] Fix typos in time attributes doc
c9c2fa4 is described below
commit c9c2fa49039ca2d00ee0cf770185a2284b4be9fd
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jan 14 14:16:18 2019 +0100
[FLINK-11304][docs][table] Fix typos in time attributes doc
---
docs/dev/table/streaming.md | 39 +++++++++++++++++++++++++++------------
1 file changed, 27 insertions(+), 12 deletions(-)
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index d0fea47..30ea24d 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -412,27 +412,30 @@ val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'u
#### Using a TableSource
-The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The `getRowtimeAttribute()` method returns the name of an existing field that carries the event time attribute of the table and is of type `LONG` or `TIMESTAMP`.
+The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttributes` interface. The `getRowtimeAttributeDescriptors()` method returns a list of `RowtimeAttributeDescriptor` for describing the final name of a time attribute, a timestamp extractor to derive the values of the attribute, and the watermark strategy associated with the attribute.
-Moreover, the `DataStream` returned by the `getDataStream()` method must have watermarks assigned that are aligned with the defined time attribute. Please note that the timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime attribute are relevant.
+Please make sure that the `DataStream` returned by the `getDataStream()` method is aligned with the defined time attribute.
+The timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are only considered if a `StreamRecordTimestamp` timestamp extractor is defined.
+Watermarks of a `DataStream` are only preserved if a `PreserveWatermarks` watermark strategy is defined.
+Otherwise, only the values of the `TableSource`'s rowtime attribute are relevant.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// define a table source with a rowtime attribute
-public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
+public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"Username", "Data", "UserActionTime"};
- TypeInformation[] types =
- new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
+ TypeInformation[] types =
+ new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
- // create stream
+ // create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
@@ -440,9 +443,15 @@ public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeA
}
@Override
- public String getRowtimeAttribute() {
+ public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// Mark the "UserActionTime" attribute as event-time attribute.
- return "UserActionTime";
+ // We create one attribute descriptor of "UserActionTime".
+ RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
+ "UserActionTime",
+ new ExistingField("UserActionTime"),
+ new AscendingTimestamps());
+ List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
+ return listRowtimeAttrDescr;
}
}
@@ -457,7 +466,7 @@ WindowedTable windowedTable = tEnv
<div data-lang="scala" markdown="1">
{% highlight scala %}
// define a table source with a rowtime attribute
-class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
override def getReturnType = {
val names = Array[String]("Username" , "Data", "UserActionTime")
@@ -466,16 +475,22 @@ class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribu
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- // create stream
+ // create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
val stream = inputStream.assignTimestampsAndWatermarks(...)
stream
}
- override def getRowtimeAttribute = {
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
// Mark the "UserActionTime" attribute as event-time attribute.
- "UserActionTime"
+ // We create one attribute descriptor of "UserActionTime".
+ val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
+ "UserActionTime",
+ new ExistingField("UserActionTime"),
+ new AscendingTimestamps)
+ val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
+ listRowtimeAttrDescr
}
}