You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/03/15 15:25:45 UTC

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/1795

    Kafka-3375: Allows Watermark Generation in the Kafka Source.

    In a nutshell this PR allows the generation of Watermarks in the Kafka source, based on a user-provided function. This is to face the situation where a Kafka source has multiple partitions per source task, and records become out of order before timestamps can be extracted and watermarks can be generated. 
    
    This PR implements 2 new versions of the FlinkKafkaConsumer for each of the supported Kafka version (0.8 and 0.9), with each one taking an additional argument, which is: for the first a AssignerWithPeriodicWatermarks() timestamp extractor and for the second, a AssignerWithPunctuatedWatermarks().

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink kafka_wm

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1795.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1795
    
----
commit d723d5e210e7e7d4f92424acb7ea177c1836bdc5
Author: kl0u <kk...@gmail.com>
Date:   2016-03-08T16:35:14Z

    Adds the Kafka09 punctuated source.

commit cbdbf3e4eafe59d7f8e8a5be2e7f408399e6e8d3
Author: kl0u <kk...@gmail.com>
Date:   2016-03-09T11:08:50Z

    Adds also the periodic watermark source.

commit c1af0a02dfae407bd95fccc1f716ad6e6deff004
Author: kl0u <kk...@gmail.com>
Date:   2016-03-09T13:06:04Z

    Code cleaning of KafkaConsumer09.

commit 5c4f3309cd1cef39015e0174e17d49fa6c6d8c0c
Author: kl0u <kk...@gmail.com>
Date:   2016-03-09T16:34:15Z

    Code cleaning of KafkaConsumer09.

commit 34eadbc2c3396cc0fda2b6961f350c5c3f567d79
Author: kl0u <kk...@gmail.com>
Date:   2016-03-09T20:52:13Z

    Testing.

commit 454717ccc5e9e78f0f0fd69df5eb7d9821a7942d
Author: kl0u <kk...@gmail.com>
Date:   2016-03-10T15:04:02Z

    Refactor code.

commit e8eac33ac9931cb85a1fdbb2bfc6691e01235d0e
Author: kl0u <kk...@gmail.com>
Date:   2016-03-14T13:36:17Z

    Added functionality for supporting empty partitions.

commit 87e18cba8de4ed58e4676847c0635ad7ebdb4621
Author: kl0u <kk...@gmail.com>
Date:   2016-03-14T14:03:23Z

    Removed guava uses.

commit 091a2e3ff9dc4168fb16b461a86b2086685db424
Author: kl0u <kk...@gmail.com>
Date:   2016-03-14T14:41:04Z

    Fixed a bug in the tests.

commit 0f115405578722153e5bdba2a8c64f3bf5a52b49
Author: kl0u <kk...@gmail.com>
Date:   2016-03-14T15:35:30Z

    Added documentation.

commit 5aa79ba01683370efc90fe4b0cccb7d852c50a2a
Author: kl0u <kk...@gmail.com>
Date:   2016-03-15T14:13:32Z

    Fixed BUG in Periodic timestamp sources.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1795#issuecomment-197241772
  
    It looks like the change is API-breaking in terms of binary compatibility because it changes the base class of the `FlinkKafkaConsumer08` and `FlinkKafkaConsumer09`. I wonder if that is a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1795#issuecomment-203373358
  
    No I opened a new one. It is this one https://github.com/apache/flink/pull/1839


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1795#issuecomment-197244578
  
    Thanks a lot @mxm !
    If it is a problem, I can put the Watermark-related code in the FlinkConsumerBase class.
    It is just that this way it is clear which code is responsible for which functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/1795


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1795#issuecomment-203368412
  
    @kl0u Has this been merged or are you going to open a new pr?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1795#discussion_r56187319
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08WithPeriodicWM.java ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * An implementation of a {@link FlinkKafkaComsumerWithWMBase} for Apache Kafka 0.8.x, that emits
    + * watermarks periodically. The user has to provide a {@link AssignerWithPeriodicWatermarks}.
    + * */
    +public class FlinkKafkaConsumer08WithPeriodicWM<T> extends FlinkKafkaConsumer08Base<T> implements Triggerable {
    +
    +	/**
    +	 * The user-specified methods to extract the timestamps from the records in Kafka, and
    +	 * to decide when to emit watermarks.
    +	 */
    +	private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
    +
    +	/**
    +	 * The interval between periodic watermark emissions, as configured via the
    +	 * {@link ExecutionConfig#getAutoWatermarkInterval()}.
    +	 */
    +	private long watermarkInterval = -1;
    +
    +	private StreamingRuntimeContext runtime = null;
    +
    +	private SourceContext<T> srcContext = null;
    +
    +	/**
    +	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
    +	 *
    +	 * @param topic
    +	 *           The name of the topic that should be consumed.
    +	 * @param valueDeserializer
    +	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
    +	 * @param props
    +	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
    +	 * @param timestampAssigner
    +	 *           The user-specified methods to extract the timestamps and decide when to emit watermarks.
    +	 *           This has to implement the {@link AssignerWithPeriodicWatermarks} interface.
    +	 */
    +	public FlinkKafkaConsumer08WithPeriodicWM(String topic,
    +												DeserializationSchema<T> valueDeserializer,
    +												Properties props,
    +												AssignerWithPeriodicWatermarks<T> timestampAssigner) {
    +		this(Collections.singletonList(topic), valueDeserializer, props, timestampAssigner);
    +	}
    +
    +	/**
    +	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
    +	 *
    +	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
    +	 * pairs, offsets, and topic names from Kafka.
    +	 *
    +	 * @param topic
    +	 *           The name of the topic that should be consumed.
    +	 * @param deserializer
    +	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
    +	 * @param props
    +	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
    +	 * @param timestampAssigner
    +	 *           The user-specified methods to extract the timestamps and decide when to emit watermarks.
    +	 *           This has to implement the {@link AssignerWithPeriodicWatermarks} interface.
    +	 */
    +	public FlinkKafkaConsumer08WithPeriodicWM(String topic,
    +												KeyedDeserializationSchema<T> deserializer,
    +												Properties props,
    +												AssignerWithPeriodicWatermarks<T> timestampAssigner) {
    +		this(Collections.singletonList(topic), deserializer, props, timestampAssigner);
    +	}
    +
    +	/**
    +	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
    +	 *
    +	 * This constructor allows passing multiple topics to the consumer.
    +	 *
    +	 * @param topics
    +	 *           The Kafka topics to read from.
    +	 * @param deserializer
    +	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
    +	 * @param props
    +	 *           The properties that are used to configure both the fetcher and the offset handler.
    +	 * @param timestampAssigner
    +	 *           The user-specified methods to extract the timestamps and decide when to emit watermarks.
    +	 *           This has to implement the {@link AssignerWithPeriodicWatermarks} interface.
    +	 */
    +	public FlinkKafkaConsumer08WithPeriodicWM(List<String> topics,
    +												DeserializationSchema<T> deserializer,
    +												Properties props,
    +												AssignerWithPeriodicWatermarks<T> timestampAssigner) {
    +		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props, timestampAssigner);
    +	}
    +
    +	/**
    +	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
    +	 *
    +	 * This constructor allows passing multiple topics and a key/value deserialization schema.
    +	 *
    +	 * @param topics
    +	 *           The Kafka topics to read from.
    +	 * @param deserializer
    +	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
    +	 * @param props
    +	 *           The properties that are used to configure both the fetcher and the offset handler.
    +	 * @param timestampAssigner
    +	 *           The user-specified methods to extract the timestamps and decide when to emit watermarks.
    +	 *           This has to implement the {@link AssignerWithPeriodicWatermarks} interface.
    +	 */
    +	public FlinkKafkaConsumer08WithPeriodicWM(List<String> topics,
    +												KeyedDeserializationSchema<T> deserializer,
    +												Properties props,
    +												AssignerWithPeriodicWatermarks<T> timestampAssigner) {
    +		super(topics, deserializer, props);
    +		this.periodicWatermarkAssigner = requireNonNull(timestampAssigner);
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +		if(runtime == null) {
    +			runtime = (StreamingRuntimeContext) getRuntimeContext();
    +		}
    +
    +		watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    +		if (watermarkInterval > 0) {
    +			runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this);
    +		}
    +	}
    +
    +	@Override
    +	public void processElement(SourceFunction.SourceContext<T> sourceContext, String topic, int partition, T value) {
    +		if(srcContext == null) {
    +			srcContext = sourceContext;
    +		}
    +
    +		// extract the timestamp based on the user-specified extractor
    +		// emits the element with the new timestamp
    +		// updates the list of minimum timestamps seen per topic per partition (if necessary)
    +
    +		long extractedTimestamp = periodicWatermarkAssigner.extractTimestamp(value, Long.MIN_VALUE);
    +		sourceContext.collectWithTimestamp(value, extractedTimestamp);
    +		updateMaximumTimestampForPartition(topic, partition, extractedTimestamp);
    +	}
    +
    +	@Override
    +	public void trigger(long timestamp) throws Exception {
    +		if(srcContext == null) {
    +			// if the trigger is called before any elements, then we
    +			// just set the next timer to fire when it should and we
    +			// ignore the triggering as this would produce no results.
    +
    +			setNextWatermarkTimer();
    +			return;
    +		}
    +
    +		final Watermark nextWatermark = periodicWatermarkAssigner.getCurrentWatermark();
    +		if(nextWatermark != null) {
    +			emitWatermarkIfMarkingProgress(srcContext);
    +		}
    +		setNextWatermarkTimer();
    +	}
    +
    +	private void setNextWatermarkTimer() {
    +		long timeToNextWatermark = getTimeToNextWaternark();
    +		runtime.registerTimer(timeToNextWatermark, this);
    +	}
    +
    +	private long getTimeToNextWaternark() {
    --- End diff --
    
    typo: Watermark


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---