You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/27 10:10:00 UTC
flink git commit: [FLINK-4121] Add timeunit (ms) to docs for
timestamps and watermarks
Repository: flink
Updated Branches:
refs/heads/master d34bdaf7f -> 35b4da273
[FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks
This closes #2165
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35b4da27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35b4da27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35b4da27
Branch: refs/heads/master
Commit: 35b4da273f755308b82468ced539c380444efd37
Parents: d34bdaf
Author: Jonas Traub <jo...@s-traub.com>
Authored: Mon Jun 27 11:06:16 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 27 12:09:06 2016 +0200
----------------------------------------------------------------------
.../streaming/event_timestamps_watermarks.md | 33 ++++++++++----------
.../streaming/api/watermark/Watermark.java | 4 +--
2 files changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md
index 493e11a..05c9f51 100644
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ b/docs/apis/streaming/event_timestamps_watermarks.md
@@ -48,14 +48,13 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
</div>
</div>
-
## Assigning Timestamps
In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the
stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the
timestamp from some field in the element.
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
the progress in event time.
There are two ways to assign timestamps and generate Watermarks:
@@ -63,6 +62,8 @@ There are two ways to assign timestamps and generate Watermarks:
1. Directly in the data stream source
2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
+<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
+millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
### Source Functions with Timestamps and Watermarks
@@ -116,18 +117,18 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
-The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
+The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter inside
-the source (or consumer) itself. More information on how to do so can be found in the
-[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
+(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
+Flink allows the specification of a timestamp assigner / watermark emitter inside
+the source (or consumer) itself. More information on how to do so can be found in the
+[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
**NOTE:** The remainder of this section presents the main interfaces a programmer has
-to implement in order to create her own timestamp extractors/watermark emitters.
-To see the pre-implemented extractors that ship with Flink, please refer to the
+to implement in order to create her own timestamp extractors/watermark emitters.
+To see the pre-implemented extractors that ship with Flink, please refer to the
[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page.
<div class="codetabs" markdown="1">
@@ -137,7 +138,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
- myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+ myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
@@ -157,7 +158,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
- myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+ myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter());
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
@@ -176,7 +177,7 @@ withTimestampsAndWatermarks
#### **With Periodic Watermarks**
-The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
+The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
on the stream elements, or purely based on processing time).
The interval (every *n* milliseconds) in which the watermark will be generated is defined via
@@ -202,7 +203,7 @@ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermar
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
- long timestamp = element.getCreationTime();
+ long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@@ -229,7 +230,7 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My
@Override
public Watermark getCurrentWatermark() {
- // return the watermark as current time minus the maximum time lag
+ // return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
@@ -249,7 +250,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
- val timestamp = element.getCreationTime()
+ val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
@@ -273,7 +274,7 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
}
override def getCurrentWatermark(): Watermark = {
- // return the watermark as current time minus the maximum time lag
+ // return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index cb9eb99..dc12d93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -48,11 +48,11 @@ public final class Watermark extends StreamElement {
// ------------------------------------------------------------------------
- /** The timestamp of the watermark */
+ /** The timestamp of the watermark in milliseconds*/
private final long timestamp;
/**
- * Creates a new watermark with the given timestamp.
+ * Creates a new watermark with the given timestamp in milliseconds.
*/
public Watermark(long timestamp) {
this.timestamp = timestamp;