You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/27 10:10:00 UTC

flink git commit: [FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks

Repository: flink
Updated Branches:
  refs/heads/master d34bdaf7f -> 35b4da273


[FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks

This closes #2165


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35b4da27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35b4da27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35b4da27

Branch: refs/heads/master
Commit: 35b4da273f755308b82468ced539c380444efd37
Parents: d34bdaf
Author: Jonas Traub <jo...@s-traub.com>
Authored: Mon Jun 27 11:06:16 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 27 12:09:06 2016 +0200

----------------------------------------------------------------------
 .../streaming/event_timestamps_watermarks.md    | 33 ++++++++++----------
 .../streaming/api/watermark/Watermark.java      |  4 +--
 2 files changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md
index 493e11a..05c9f51 100644
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ b/docs/apis/streaming/event_timestamps_watermarks.md
@@ -48,14 +48,13 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 </div>
 </div>
 
-
 ## Assigning Timestamps
 
 In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the
 stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the
 timestamp from some field in the element.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about 
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
 the progress in event time.
 
 There are two ways to assign timestamps and generate Watermarks:
@@ -63,6 +62,8 @@ There are two ways to assign timestamps and generate Watermarks:
   1. Directly in the data stream source
   2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
 
+<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
+millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
 
 ### Source Functions with Timestamps and Watermarks
 
@@ -116,18 +117,18 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
 Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
 original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
 
-The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so. 
+The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
 A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
 In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, 
-Flink allows the specification of a timestamp assigner / watermark emitter inside 
-the source (or consumer) itself. More information on how to do so can be found in the 
-[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html). 
+(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
+Flink allows the specification of a timestamp assigner / watermark emitter inside
+the source (or consumer) itself. More information on how to do so can be found in the
+[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
 
 
 **NOTE:** The remainder of this section presents the main interfaces a programmer has
-to implement in order to create her own timestamp extractors/watermark emitters. 
-To see the pre-implemented extractors that ship with Flink, please refer to the 
+to implement in order to create her own timestamp extractors/watermark emitters.
+To see the pre-implemented extractors that ship with Flink, please refer to the
 [Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page.
 
 <div class="codetabs" markdown="1">
@@ -137,7 +138,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 DataStream<MyEvent> stream = env.readFile(
-        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter(), typeInfo);
 
 DataStream<MyEvent> withTimestampsAndWatermarks = stream
@@ -157,7 +158,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
 val stream: DataStream[MyEvent] = env.readFile(
-         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
          FilePathFilter.createDefaultFilter());
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
@@ -176,7 +177,7 @@ withTimestampsAndWatermarks
 
 #### **With Periodic Watermarks**
 
-The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending 
+The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
 on the stream elements, or purely based on processing time).
 
 The interval (every *n* milliseconds) in which the watermark will be generated is defined via
@@ -202,7 +203,7 @@ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermar
 
     @Override
     public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-        long timestamp = element.getCreationTime(); 
+        long timestamp = element.getCreationTime();
         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
         return timestamp;
     }
@@ -229,7 +230,7 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My
 
 	@Override
 	public Watermark getCurrentWatermark() {
-		// return the watermark as current time minus the maximum time lag 
+		// return the watermark as current time minus the maximum time lag
 		return new Watermark(System.currentTimeMillis() - maxTimeLag);
 	}
 }
@@ -249,7 +250,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv
     var currentMaxTimestamp: Long;
 
     override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        val timestamp = element.getCreationTime() 
+        val timestamp = element.getCreationTime()
         currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
         timestamp;
     }
@@ -273,7 +274,7 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
     }
 
     override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag 
+        // return the watermark as current time minus the maximum time lag
         new Watermark(System.currentTimeMillis() - maxTimeLag)
     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index cb9eb99..dc12d93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -48,11 +48,11 @@ public final class Watermark extends StreamElement {
 	
 	// ------------------------------------------------------------------------
 	
-	/** The timestamp of the watermark */
+	/** The timestamp of the watermark in milliseconds*/
 	private final long timestamp;
 
 	/**
-	 * Creates a new watermark with the given timestamp.
+	 * Creates a new watermark with the given timestamp in milliseconds.
 	 */
 	public Watermark(long timestamp) {
 		this.timestamp = timestamp;