You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "olivier sohn (JIRA)" <ji...@apache.org> on 2017/09/01 15:24:00 UTC

[jira] [Created] (FLINK-7569) '1 record' delay for counted windowed streams

olivier sohn created FLINK-7569:
-----------------------------------

             Summary: '1 record' delay for counted windowed streams
                 Key: FLINK-7569
                 URL: https://issues.apache.org/jira/browse/FLINK-7569
             Project: Flink
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.3.2
         Environment: osx
            Reporter: olivier sohn
            Priority: Minor



In the example below (described at the end of the class documentation), we see that Flink waits for the first element of the next window to be received before "computing" a window. I guess this was necessary for windows based on time, but here it's a window based on count, so I guess it should be possible to specialize the behaviour so that as soon as the window has the right count of elements, the processing is executed, instead of waiting for the first element of the next window.

{code:java}

/**
 * Example on how to read with a Kafka consumer and write the size-2 windowed sum of records using a Kafka producer
 *
 *   For example, it transforms
 *
 *   1 1 2 1 3 2 1
 *
 *   into
 *
 *   2 3 5
 *
 * Note that the Kafka source and sink are expecting the following parameters to be set
 *  - "bootstrap.servers" (comma separated list of kafka brokers)
 *  - "zookeeper.connect" (comma separated list of zookeeper servers)
 *
 * Note that the Kafka source is expecting the following parameters to be set
 *  - "topicIn" the name of the topic to read data from.
 *  - "group.id" the id of the consumer group
 *
 * Note that the Kafka sink is expecting the following parameters to be set:
 *  - "topicOut" the name of the topic to read data to.
 *
 * You can pass these required parameters using
 * "--bootstrap.servers host:port,host1:port1
 *  --zookeeper.connect host:port
 *  --topicIn testTopicIn
 *  --topicOut testTopicOut"
 *
 * This is a valid input example:
 * 		--topicIn testIn
 * 		--topicOut testOut
 * 		--bootstrap.servers 172.22.12.3:49092
 * 		--zookeeper.connect 172.22.12.3:22181/dev
 * 		--group.id myGroup
 *
 *
 * if LeaderNotAvailableException is raised, it means the topic doesn't exist
 *
 * It can be created using the command:
 * > kafka-topics --create
 *                --zookeeper 172.22.12.3:22181/dev
 *                --partitions 2
 *                --replication-factor 1
 *                --topic testTopicOut
 *
 * To test that the example works:
 * ------------------------------
 *
 * listen to results :
 * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
 *                          --whitelist testTopicOut
 *
 * run this class' main
 *
 * create inputs:
 * > echo 1 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 2 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 3 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 4 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 5 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 *
 * in the console that listens to the echo you should see :
 * > 3
 * > 7
 *
 * Note that the last input, '5', is necessary in this case. It seems Flink waits for the first element
 * of the next window to be received before further computing the window.
 */
public class ReadFromKafkaWriteWindowedSumIntoKafka {

	public static void main(String[] args) throws Exception {
		// create flink execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// parse user parameters
		ParameterTool parameterTool = ParameterTool.fromArgs(args);

		// add a kafka source which reads from 'topicIn'
		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(
				parameterTool.getRequired("topicIn"), new SimpleStringSchema(), parameterTool.getProperties()));

		messageStream

				// group stream elements 2 by 2
				.window(new Count(2))

				// sum the windowed stream
				.foldWindow(0, new FoldFunction<String, Integer>() {
			@Override
			public Integer fold(Integer accumulator, String value) throws Exception {
				return accumulator + Integer.parseInt(value);
			}
		})

				// convert DiscretizedStream to DataStream
				.flatten()

				// convert stream data from Integer to String
				.map(new MapFunction<Integer, String>() {
			@Override
			public String map(Integer value) throws Exception {
				return Integer.toString(value);
			}
		})

				// add a kafka sink which writes into 'topicOut'
				.addSink(new KafkaSink<>(
						parameterTool.getRequired("bootstrap.servers"),
						parameterTool.getRequired("topicOut"),
						new WriteIntoKafka.SimpleStringSchema()));

		env.execute();
	}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)