You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/05/30 15:27:28 UTC

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4020

    [FLINK-6747] [table] [docs] Time attributes section added

    This documents the Table API's time attributes.
    
    CC @fhueske @alpinegizmo 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-6747

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4020
    
----
commit 4041c56ac00e98745a01b4de05334986545bacdd
Author: twalthr <tw...@apache.org>
Date:   2017-05-30T15:26:07Z

    [FLINK-6747] [table] [docs] Time attributes section added

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119154561
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTimestamp, Username, Data, UserActionTime.rowtime");
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    --- End diff --
    
    // the first field has been used for timestamp extraction, and is no longer necessary
    // replace the first field with a logical event time attribute
    
    (make the same change for scala)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119153392
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    --- End diff --
    
    The Table API & SQL assume ...
    
    ... and is hidden from the end user of the API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119149629
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
    --- End diff --
    
    *Ingestion time* is the time that events enter Flink; internally, it is treated similarly to event time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119221597
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
    --- End diff --
    
    "Flink is able to process streaming data based on different notions of time."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119151823
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
    --- End diff --
    
    As long as a time attribute is not modified and is simply forwarded ...
    
    ... and thus can not be used for further time-based operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119155039
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTimestamp, Username, Data, UserActionTime.rowtime");
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.rowtime)
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with rowtime attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		// ...
    +		// extract timestamp and assign watermarks based on knownledge about stream
    --- End diff --
    
    extract timestamp and assign watermarks based on knowledge of the stream
      
                
      Write


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119154720
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTimestamp, Username, Data, UserActionTime.rowtime");
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119222374
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    --- End diff --
    
    It can be any timestamp that is associate with a row. How about: "Event time refers to the processing of streaming data based on timestamps which are attached to each row. For example the timestamps can encode when an event happened."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4020: [FLINK-6747] [table] [docs] Time attributes section added

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4020
  
    Thanks for the feedback @alpinegizmo and @fhueske. I hope I addressed all your comments. I will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119226503
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    --- End diff --
    
    I would move the discussion about where timestamps and watermarks are added to the subsections. 
    
    In "Datastream-to-Table conversion" the timestamps should be present in the DataStream that is converted. In "Using a TableSource" the timestamp / watermarks must be assigned in the stream that is returned by the `getDataStream()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119227179
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    --- End diff --
    
    I think this is confusing. If the first field carries the timestamp it could be removed. I'd rather make an example, where the DataStream does not contain the timestamp at all and hence, we need to extend the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119153620
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    --- End diff --
    
    ... because it is no long needed after ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119153027
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    --- End diff --
    
    Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119226851
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    --- End diff --
    
    I'd remove this and discuss it in the subsections. IMO it is more clear to have timestamp / watermark generation and event-time attribute specification in one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119151964
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    --- End diff --
    
    It requires neither timestamp extraction nor watermark generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119297963
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    --- End diff --
    
    IMO it is better to discuss it in the super section, because it explains why a TableSource might be useful. But I also added your suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119153932
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    --- End diff --
    
    extract the timestamp from the first field, and assign watermarks based on knowledge of the stream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119152127
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    --- End diff --
    
    ... as a processing time attribute


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr closed the pull request at:

    https://github.com/apache/flink/pull/4020


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119222676
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    --- End diff --
    
    "assume that a" -> "require that the"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119152769
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    --- End diff --
    
    It also ensures replayable results of the table program ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119154160
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    --- End diff --
    
    // the first field is still useful, and should be kept
    // declare an additional logical field as an event time attribute


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119152552
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    --- End diff --
    
    define table source with a processing time attribute
    
    (same change again below for scala)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119150890
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    --- End diff --
    
    is it no longer possible to link directly to the windowing section of the table docs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119152241
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    --- End diff --
    
    ... as a processing time attribute


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119227533
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTimestamp, Username, Data, UserActionTime.rowtime");
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.rowtime)
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    --- End diff --
    
    Add that the DataStream returned by `getDataStream()` must have timestamps and watermarks assigned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119223115
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    --- End diff --
    
    links on "Table API" and "SQL" for the respective sections in the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119155113
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    +
    +There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
    +
    +- Extending the physical schema by an additional logical field
    +- Replacing a physical field by a logical field (e.g. because it is not needed anymore after timestamp extraction).
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTimestamp, Username, Data, UserActionTime.rowtime");
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// extract timestamp from first field and assign watermarks based on knownledge about stream
    +val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    +
    +// Option 1:
    +
    +// the first field has still some value and should be kept
    +// declare an additional logical field as event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.rowtime)
    +
    +
    +// Option 2:
    +
    +// the first field has been used for timestamp extraction and is not necessary anymore
    +// replace first field as logical event time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with rowtime attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		// ...
    +		// extract timestamp and assign watermarks based on knownledge about stream
    +		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getRowtimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with rowtime attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream 
    +		// ...
    +		// extract timestamp and assign watermarks based on knownledge about stream
    --- End diff --
    
    (same as above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119227410
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and can thus not be used for time-based operations anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It does neither require timestamp extraction nor watermark generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    +
    +WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    +
    +	@Override
    +	public TypeInformation<Row> getReturnType() {
    +		String[] names = new String[] {"Username" , "Data"};
    +		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    +		return Types.ROW(names, types);
    +	}
    +
    +	@Override
    +	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    +		// create stream 
    +		DataStream<Row> stream = ...;
    +		return stream;
    +	}
    +
    +	@Override
    +	public String getProctimeAttribute() {
    +		// field with this name will be appended as a third field 
    +		return "UserActionTime";
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    +
    +	override def getReturnType = {
    +		val names = Array[String]("Username" , "Data")
    +		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    +		Types.ROW(names, types)
    +	}
    +
    +	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +		// create stream
    +		val stream = ...
    +		stream
    +	}
    +
    +	override def getProctimeAttribute = {
    +		// field with this name will be appended as a third field 
    +		"UserActionTime"
    +	}
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +	.scan("UserActions")
    +	.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in both a batch and streaming environment. A time attribute in streaming can be a regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a TableSource with knowledge about the incoming data's characteristics and hidden from the API end user.
    +
    +After timestamp and watermarks are generated, an event time attribute can be defined in two ways:
    +
    +#### During DataStream-to-Table Conversion
    +
    +The event time attribute is defined with the `.rowtime` property during schema definition. 
    --- End diff --
    
    Add that we expect that the DataStream has timestamps and watermarks assigned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4020: [FLINK-6747] [table] [docs] Time attributes sectio...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119151221
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl }}/dev/table/tableApi.html) in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
    +
    +Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or pre-defined when using a `TableSource`. Once a time attribute is defined at the beginning, it can be referenced as field and used in time-based operations.
    --- End diff --
    
    They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined, it can be referenced as a field and can be used in time-based operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---