You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofer, Tovi " <to...@citi.com> on 2017/09/19 15:02:41 UTC

Flink kafka consumer that read from two partitions in local mode

Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))

.name("fix_topic");

env.execute("MsgSimulatorJob");


Consumer setup:

String topicName = "fix";
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0


Thanks and regards,
Tovi

RE: Flink kafka consumer that read from two partitions in local mode

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Glad you sorted it out :)

AFAIK, the number of created Kafka partitions cannot be specified using the Kafka client.
If you want topics to be created with 2 partitions, you’ll have to change the default for that in the broker configurations.
Or, simply create the topic with the desired number of partitions.

Cheers,
Gordon

On 26 September 2017 at 9:43:33 AM, Sofer, Tovi (tovi.sofer@citi.com) wrote:

Hi,

 

Issue was solved.

After your guidance to producer part, I’ve checked in Kafka and saw that topic was created with one partition.

I’ve re- created it with two partitions manually and it fixed the problem.

// update in KAFKA_HOME/config/server.properties : set delete.topic.enable=true
%KAFKA_HOME%\bin\windows\kafka-topics.bat --delete  --topic fix --zookeeper localhost:2181
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --partitions 2 --topic fix --zookeeper localhost:2181 --replication-factor 1
%KAFKA_HOME%\bin\windows\kafka-topics.bat ---list --zookeeper localhost:2181

 

A follow-up question – is it possible to create the topic with two partitions while creating the FlinkKafKaProducer?

Since by default it seems to create it with one partition.

 

Thanks and regards,

Tovi

 

From: Sofer, Tovi [ICG-IT]  
Sent: יום ב 25 ספטמבר 2017 17:18
To: 'Tzu-Li (Gordon) Tai'; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

 

Hi Gordon,

 

Thanks for your assistance.

 

·         We are running flink currently  in local mode(MiniCluster), using flink 1.3.2 and flink-connector-kafka-0.10_2.10.

 

·         In Consumer log I see 1 partition only (when parallelism=1), so the problem indeed seems to be in producer.

2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig - [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was supplied but isn't a known config.

2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1

2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799

2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]

2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics (with number of partitions): fix (1), 

2017-09-25 17:10:58,680 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='fix', partition=0}]

 

·         The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one partition for FIX topic.

 

In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- [Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

                acks = 1

                batch.size = 16384

                block.on.buffer.full = false

                bootstrap.servers = [localhost:9092]

                buffer.memory = 33554432

                client.id =

                compression.type = none

                connections.max.idle.ms = 540000

                interceptor.classes = null

                key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

                linger.ms = 0

                max.block.ms = 60000

                max.in.flight.requests.per.connection = 5

                max.request.size = 1048576

                metadata.fetch.timeout.ms = 60000

                metadata.max.age.ms = 300000

                metric.reporters = []

                metrics.num.samples = 2

                metrics.sample.window.ms = 30000

                partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner

                receive.buffer.bytes = 32768

                reconnect.backoff.ms = 50

                request.timeout.ms = 30000

                retries = 0

                retry.backoff.ms = 100

                sasl.jaas.config = null

                sasl.kerberos.kinit.cmd = /usr/bin/kinit

                sasl.kerberos.min.time.before.relogin = 60000

                sasl.kerberos.service.name = null

                sasl.kerberos.ticket.renew.jitter = 0.05

                sasl.kerberos.ticket.renew.window.factor = 0.8

                sasl.mechanism = GSSAPI

                security.protocol = PLAINTEXT

                send.buffer.bytes = 131072

                ssl….

                timeout.ms = 30000

                value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to produce into default topic fix

 

 

Thanks,

Tovi

 

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]  
Sent: יום ב 25 ספטמבר 2017 15:06
To: Sofer, Tovi [ICG-IT]; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

 

Hi Tovi,

 

Your code seems to be correct, and as Fabian described, you don’t need parallelism of 2 to read 2 partitions; a single parallel instance of the source can read multiple partitions.

 

I’m not sure what could have possibly gone wrong at the moment from a first look, so I may need to randomly ask you some questions:

 

Could you let me know which Flink version you are on?

Also, could you try searching in the log to see if you find consumer logs such as:

“Consumer subtask ... will start reading the following (numPartitions) partitions from ...: (partition info) "

You can try setting parallelism of the source to 1, and you should see that the subtask is reading 2 partitions.

 

From the metrics log it does seem like the consumer has picked up both partitions 0 and 1, but no records seem to be coming from partition 0. Have you perhaps tried using a non-Flink consumer, perhaps the simple console consumer, to read the topic, and see if records from both partitions are consumed properly?

 

Let me know, I’m sure we can figure this out somehow.

 

Cheers,

Gordon

On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.sofer@citi.com) wrote:

Thank you Fabian.

 

Fabian, Gordon, am I missing something in consumer setup?

Should I configure consumer in some way to subscribe to two partitions?

 

Thanks and regards,

Tovi

 

From: Fabian Hueske [mailto:fhueske@gmail.com]  
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

 

Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.

Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.

Best,

Fabian

 

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>:

Hi,

 

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.

The producer writes to two partition (as it is shown in metrics report).

But the consumer seems to read always from one partition only.

Am I missing something in partition configuration?

 

Code:

 

Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);
env.setParallelism(2);
String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());
SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))
.name(“fix_topic”);
env.execute(“MsgSimulatorJob”);
 

 

Consumer setup:


String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());
DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);
 

As you can see in output, only 1 consumer partition seems to be used:

Producer output:

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334

2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0

Consumer output:

2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0

 

 

Thanks and regards,

Tovi

 

RE: Flink kafka consumer that read from two partitions in local mode

Posted by "Sofer, Tovi " <to...@citi.com>.
Hi,

Issue was solved.
After your guidance to producer part, I’ve checked in Kafka and saw that topic was created with one partition.
I’ve re- created it with two partitions manually and it fixed the problem.

// update in KAFKA_HOME/config/server.properties : set delete.topic.enable=true
%KAFKA_HOME%\bin\windows\kafka-topics.bat --delete  --topic fix --zookeeper localhost:2181
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --partitions 2 --topic fix --zookeeper localhost:2181 --replication-factor 1
%KAFKA_HOME%\bin\windows\kafka-topics.bat ---list --zookeeper localhost:2181

A follow-up question – is it possible to create the topic with two partitions while creating the FlinkKafKaProducer?
Since by default it seems to create it with one partition.

Thanks and regards,
Tovi

From: Sofer, Tovi [ICG-IT]
Sent: יום ב 25 ספטמבר 2017 17:18
To: 'Tzu-Li (Gordon) Tai'; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Gordon,

Thanks for your assistance.


·         We are running flink currently  in local mode(MiniCluster), using flink 1.3.2 and flink-connector-kafka-0.10_2.10.


·         In Consumer log I see 1 partition only (when parallelism=1), so the problem indeed seems to be in producer.
2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig - [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was supplied but isn't a known config.
2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1
2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799
2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]
2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics (with number of partitions): fix (1),
2017-09-25 17:10:58,680 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='fix', partition=0}]


·         The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one partition for FIX topic.



In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- [Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

                acks = 1

                batch.size = 16384

                block.on.buffer.full = false

                bootstrap.servers = [localhost:9092]

                buffer.memory = 33554432

                client.id =

                compression.type = none

                connections.max.idle.ms = 540000

                interceptor.classes = null

                key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

                linger.ms = 0

                max.block.ms = 60000

                max.in.flight.requests.per.connection = 5

                max.request.size = 1048576

                metadata.fetch.timeout.ms = 60000

                metadata.max.age.ms = 300000

                metric.reporters = []

                metrics.num.samples = 2

                metrics.sample.window.ms = 30000

                partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner

                receive.buffer.bytes = 32768

                reconnect.backoff.ms = 50

                request.timeout.ms = 30000

                retries = 0

                retry.backoff.ms = 100

                sasl.jaas.config = null

                sasl.kerberos.kinit.cmd = /usr/bin/kinit

                sasl.kerberos.min.time.before.relogin = 60000

                sasl.kerberos.service.name = null

                sasl.kerberos.ticket.renew.jitter = 0.05

                sasl.kerberos.ticket.renew.window.factor = 0.8

                sasl.mechanism = GSSAPI

                security.protocol = PLAINTEXT

                send.buffer.bytes = 131072

                ssl….

                timeout.ms = 30000

                value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to produce into default topic fix


Thanks,
Tovi

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]
Sent: יום ב 25 ספטמבר 2017 15:06
To: Sofer, Tovi [ICG-IT]; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need parallelism of 2 to read 2 partitions; a single parallel instance of the source can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such as:
“Consumer subtask ... will start reading the following (numPartitions) partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both partitions 0 and 1, but no records seem to be coming from partition 0. Have you perhaps tried using a non-Flink consumer, perhaps the simple console consumer, to read the topic, and see if records from both partitions are consumed properly?

Let me know, I’m sure we can figure this out somehow.

Cheers,
Gordon

On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.sofer@citi.com<ma...@citi.com>) wrote:
Thank you Fabian.

Fabian, Gordon, am I missing something in consumer setup?
Should I configure consumer in some way to subscribe to two partitions?

Thanks and regards,
Tovi

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.
Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.
Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>>:
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))

.name(“fix_topic”);

env.execute(“MsgSimulatorJob”);


Consumer setup:

String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0


Thanks and regards,
Tovi


RE: Flink kafka consumer that read from two partitions in local mode

Posted by "Sofer, Tovi " <to...@citi.com>.
Hi Gordon,

Thanks for your assistance.


·         We are running flink currently  in local mode(MiniCluster), using flink 1.3.2 and flink-connector-kafka-0.10_2.10.


·         In Consumer log I see 1 partition only (when parallelism=1), so the problem indeed seems to be in producer.
2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig - [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was supplied but isn't a known config.
2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1
2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - [Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799
2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]
2017-09-25 17:10:58,679 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics (with number of partitions): fix (1),
2017-09-25 17:10:58,680 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='fix', partition=0}]


·         The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one partition for FIX topic.



In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- [Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

                acks = 1

                batch.size = 16384

                block.on.buffer.full = false

                bootstrap.servers = [localhost:9092]

                buffer.memory = 33554432

                client.id =

                compression.type = none

                connections.max.idle.ms = 540000

                interceptor.classes = null

                key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

                linger.ms = 0

                max.block.ms = 60000

                max.in.flight.requests.per.connection = 5

                max.request.size = 1048576

                metadata.fetch.timeout.ms = 60000

                metadata.max.age.ms = 300000

                metric.reporters = []

                metrics.num.samples = 2

                metrics.sample.window.ms = 30000

                partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner

                receive.buffer.bytes = 32768

                reconnect.backoff.ms = 50

                request.timeout.ms = 30000

                retries = 0

                retry.backoff.ms = 100

                sasl.jaas.config = null

                sasl.kerberos.kinit.cmd = /usr/bin/kinit

                sasl.kerberos.min.time.before.relogin = 60000

                sasl.kerberos.service.name = null

                sasl.kerberos.ticket.renew.jitter = 0.05

                sasl.kerberos.ticket.renew.window.factor = 0.8

                sasl.mechanism = GSSAPI

                security.protocol = PLAINTEXT

                send.buffer.bytes = 131072

                ssl….

                timeout.ms = 30000

                value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to produce into default topic fix


Thanks,
Tovi

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@apache.org]
Sent: יום ב 25 ספטמבר 2017 15:06
To: Sofer, Tovi [ICG-IT]; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need parallelism of 2 to read 2 partitions; a single parallel instance of the source can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such as:
“Consumer subtask ... will start reading the following (numPartitions) partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both partitions 0 and 1, but no records seem to be coming from partition 0. Have you perhaps tried using a non-Flink consumer, perhaps the simple console consumer, to read the topic, and see if records from both partitions are consumed properly?

Let me know, I’m sure we can figure this out somehow.

Cheers,
Gordon

On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.sofer@citi.com<ma...@citi.com>) wrote:
Thank you Fabian.

Fabian, Gordon, am I missing something in consumer setup?
Should I configure consumer in some way to subscribe to two partitions?

Thanks and regards,
Tovi

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.
Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.
Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>>:
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))

.name(“fix_topic”);

env.execute(“MsgSimulatorJob”);


Consumer setup:

String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0


Thanks and regards,
Tovi


RE: Flink kafka consumer that read from two partitions in local mode

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need parallelism of 2 to read 2 partitions; a single parallel instance of the source can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such as:
“Consumer subtask ... will start reading the following (numPartitions) partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both partitions 0 and 1, but no records seem to be coming from partition 0. Have you perhaps tried using a non-Flink consumer, perhaps the simple console consumer, to read the topic, and see if records from both partitions are consumed properly?

Let me know, I’m sure we can figure this out somehow.

Cheers,
Gordon
On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.sofer@citi.com) wrote:

Thank you Fabian.

 

Fabian, Gordon, am I missing something in consumer setup?

Should I configure consumer in some way to subscribe to two partitions?

 

Thanks and regards,

Tovi

 

From: Fabian Hueske [mailto:fhueske@gmail.com]  
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

 

Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.

Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.

Best,

Fabian

 

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>:

Hi,

 

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.

The producer writes to two partition (as it is shown in metrics report).

But the consumer seems to read always from one partition only.

Am I missing something in partition configuration?

 

Code:

 

Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);
env.setParallelism(2);
String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());
SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))
.name(“fix_topic”);
env.execute(“MsgSimulatorJob”);
 

 

Consumer setup:


String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());
DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);
 

As you can see in output, only 1 consumer partition seems to be used:

Producer output:

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334

2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0

Consumer output:

2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0

 

 

Thanks and regards,

Tovi

 

RE: Flink kafka consumer that read from two partitions in local mode

Posted by "Sofer, Tovi " <to...@citi.com>.
Thank you Fabian.

Fabian, Gordon, am I missing something in consumer setup?
Should I configure consumer in some way to subscribe to two partitions?

Thanks and regards,
Tovi

From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.
Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.
Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>>:
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, TOPIC_NAME, new SimpleStringSchema()))

.name(“fix_topic”);

env.execute(“MsgSimulatorJob”);


Consumer setup:

String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0


Thanks and regards,
Tovi


Re: Flink kafka consumer that read from two partitions in local mode

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going
wrong.
Just a side note: you don't need to set the parallelism to 2 to read from
two partitions. A single consumer instance reads can read from multiple
partitions.

Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <to...@citi.com>:

> Hi,
>
>
>
> I am trying to setup FlinkKafkaConsumer which reads from two partitions in
> local mode, using  setParallelism=2.
>
> The producer writes to two partition (as it is shown in metrics report).
>
> But the consumer seems to read always from one partition only.
>
> Am I missing something in partition configuration?
>
>
>
> Code:
>
>
>
> *Producer setup:*
> Configuration localConfig = *new *Configuration();
> StreamExecutionEnvironment env = StreamExecutionEnvironment.*createLocalEnvironment*(parallelism, localConfig);
>
> env.setParallelism(2);
>
> String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.*KAFKA_PORT*.fullName());
>
> SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.*of*(String.*class*)).name(*sourceGenerationType*.getValue());
> fixMsgSource.addSink(*new *FlinkKafkaProducer010<>(*"localhost:"  *+ kafkaPort, *TOPIC_NAME*, *new *SimpleStringSchema()))
>
> .name(*“fix_topic”*);
>
> env.execute(“*MsgSimulatorJob*”);
>
>
>
>
>
> *Consumer setup:*
>
>
> String topicName = “fix”;
> Configuration conf = *new *Configuration();
> StreamExecutionEnvironment env = StreamExecutionEnvironment.*createLocalEnvironmentWithWebUI*(conf);
>
> env.setParallelism(2);
> env.getConfig().setGlobalJobParameters(configParams);
> *// make parameters available in the web interface*DeserializationSchema<Tuple2<Long, String>> deserializationSchema = *new *SimpleStringAndTimestampDeserializationSchema ();
> FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = *new *FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties());
>
> DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name(*"fix_topic"*).setParallelism(2);
>
>
>
> As you can see in output, only 1 consumer partition seems to be used:
>
> Producer output:
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.1.numRecordsInPerSecond: 19836.033333333333
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.0.numRecordsInPerSecond: 20337.933333333334
>
> 2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
>
> Consumer output:
>
> 2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate:
> 982.0051413881748
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
>
>
>
>
>
> Thanks and regards,
>
> Tovi
>