You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/22 16:32:29 UTC

flink git commit: [FLINK-3752] Add Per-Kafka-Partition Watermark Generation to the docs

Repository: flink
Updated Branches:
  refs/heads/master a973d84b2 -> 7ec6d7b55


[FLINK-3752] Add Per-Kafka-Partition Watermark Generation to the docs

This closes #2142


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ec6d7b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ec6d7b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ec6d7b5

Branch: refs/heads/master
Commit: 7ec6d7b55a65258079e623b5f4394c4deeae2ad8
Parents: a973d84
Author: kl0u <kk...@gmail.com>
Authored: Mon Jun 20 16:47:03 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jun 22 18:32:09 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md         | 54 ++++++++++++++++++++
 .../streaming/event_timestamps_watermarks.md    | 18 +++++--
 .../streaming/api/datastream/DataStream.java    |  4 +-
 3 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ec6d7b5/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md
index 45ec6ad..9bd70be 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -198,6 +198,60 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
+#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+
+In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. 
+In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
+special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka 
+Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
+
+You can specify your custom timestamp extractor/watermark emitter as described 
+[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the 
+[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you 
+can pass it to your consumer in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer08<String> myConsumer = 
+    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+
+DataStream<String> stream = env
+	.addSource(myConsumer)
+	.print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+stream = env
+    .addSource(myConsumer)
+    .print
+{% endhighlight %}
+</div>
+</div>
+ 
+Internally, an instance of the assigner is executed per Kafka partition.
+When such an assigner is specified, for each record read from Kafka, the 
+`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and 
+the `Watermark getCurrentWatermark()` (for periodic) or the 
+`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine 
+if a new watermark should be emitted and with which timestamp.
+
 ### Kafka Producer
 
 The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns

http://git-wip-us.apache.org/repos/asf/flink/blob/7ec6d7b5/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md
index fad84a6..493e11a 100644
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ b/docs/apis/streaming/event_timestamps_watermarks.md
@@ -116,10 +116,14 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
 Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
 original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
 
-The timestamp assigners usually are specified immediately after the data source, but it is not strictly required to do so. A
-common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
+The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so. 
+A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
 In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). 
+(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, 
+Flink allows the specification of a timestamp assigner / watermark emitter inside 
+the source (or consumer) itself. More information on how to do so can be found in the 
+[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html). 
+
 
 **NOTE:** The remainder of this section presents the main interfaces a programmer has
 to implement in order to create her own timestamp extractors/watermark emitters. 
@@ -132,7 +136,9 @@ To see the pre-implemented extractors that ship with Flink, please refer to the
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
+DataStream<MyEvent> stream = env.readFile(
+        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+        FilePathFilter.createDefaultFilter(), typeInfo);
 
 DataStream<MyEvent> withTimestampsAndWatermarks = stream
         .filter( event -> event.severity() == WARNING )
@@ -150,7 +156,9 @@ withTimestampsAndWatermarks
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
+val stream: DataStream[MyEvent] = env.readFile(
+         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+         FilePathFilter.createDefaultFilter());
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
         .filter( _.severity == WARNING )

http://git-wip-us.apache.org/repos/asf/flink/blob/7ec6d7b5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 859b962..204557d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -776,8 +776,8 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Assigns timestamps to the elements in the data stream and periodically creates
-	 * watermarks to signal event time progress.
+	 * Assigns timestamps to the elements in the data stream and creates watermarks to
+	 * signal event time progress based on the elements themselves.
 	 *
 	 * <p>This method creates watermarks based purely on stream elements. For each element
 	 * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},