You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/06/21 09:21:36 UTC

[GitHub] flink pull request #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Gen...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2142

    [FLINK-3752] Add Per-Kafka-Partition Watermark Generation to the docs

    As the name implies, this PR only touches the documentation and describes how to assign timestamps and emit watermarks from within a Kafka source.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink kafka_doc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2142
    
----
commit 0eb4afadb7dbeecac1b22d632a67004cf172bf53
Author: kl0u <kk...@gmail.com>
Date:   2016-06-20T14:47:03Z

    [FLINK-3752] Add Per-Kafka-Partition Watermark Generation to the docs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Gen...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2142#discussion_r68060615
  
    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -198,6 +198,63 @@ Flink on YARN supports automatic restart of lost YARN containers.
     
     If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
     
    +#### Kafka Consumers and Timestamp Extraction/Watermark Emission
    +
    +In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. 
    +In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
    +special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka 
    +Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
    +
    +You can specify your custom timestamp extractor/ watermark emitter as described 
    --- End diff --
    
    space after /


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Generation...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2142
  
    @rmetzger I integrated your comments.
    Let me know if there are any follow-ups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Gen...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2142


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Generation...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2142
  
    Thanks @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Generation...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2142
  
    Thank you for opening the pull request.
    There's one issue that needs addressing, once that's done, we can merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Generation...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2142
  
    Okay, merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2142: [FLINK-3752] Add Per-Kafka-Partition Watermark Gen...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2142#discussion_r68061306
  
    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -198,6 +198,63 @@ Flink on YARN supports automatic restart of lost YARN containers.
     
     If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
     
    +#### Kafka Consumers and Timestamp Extraction/Watermark Emission
    +
    +In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. 
    +In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
    +special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka 
    +Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
    +
    +You can specify your custom timestamp extractor/ watermark emitter as described 
    +[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the 
    +[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you 
    +can pass it to your consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +Properties properties = new Properties();
    +properties.setProperty("bootstrap.servers", "localhost:9092");
    +// only required for Kafka 0.8
    +properties.setProperty("zookeeper.connect", "localhost:2181");
    +properties.setProperty("group.id", "test");
    +
    +FlinkKafkaConsumer08<String> myConsumer = 
    +    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
    +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
    +
    +DataStream<String> stream = env
    +	.addSource(myConsumer)
    +	.print();
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val properties = new Properties();
    +properties.setProperty("bootstrap.servers", "localhost:9092");
    +// only required for Kafka 0.8
    +properties.setProperty("zookeeper.connect", "localhost:2181");
    +properties.setProperty("group.id", "test");
    +
    +val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties);
    +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
    +stream = env
    +    .addSource(myConsumer)
    +    .print
    +{% endhighlight %}
    +</div>
    +</div>
    + 
    +When such an assigner is specified, for each record read from Kafka, the 
    +`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and 
    +the `Watermark getCurrentWatermark()` (for periodic) or the 
    +`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine 
    +if a new watermark should be emitted and with which timestamp.
    +
    +Internally, an instance of the assigner is executed per Kafka partition, and when a single task is reading 
    +form multiple partitions, watermark alignment is done as described 
    +[here](http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/).
    --- End diff --
    
    Could you remove the link to the data Artisans blog here?
    The Flink documentation should be self-contained and independent of material from any companies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---