You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "r. r." <ro...@abv.bg> on 2017/11/17 15:41:04 UTC

all task managers reading from all kafka partitions

Hi

I have this strange problem: 4 task managers each with one task slot, attaching to the same Kafka topic which has 10 partitions.

When I post a single message to the Kafka topic it seems that all 4 consumers fetch the message and start processing (confirmed by TM logs).

If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says that only one message was posted to a single partition. Next message would generally go to another partition.

In addition, while the Flink jobs are running on the message, I start two kafka-console-consumer.sh and each would get only one message, as expected.

On start each of the Flink TM would post something that to me reads as if it would read from all partitions:

2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these topics: [TopicToConsume]
2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the following topics (with number of partitions): TopicToConsume (10), 
2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 10 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, KafkaTopicPartition{topic='TopicToConsume', partition=6}, KafkaTopicPartition{topic='TopicToConsume', partition=7}, KafkaTopicPartition{topic='TopicToConsume', partition=4}, KafkaTopicPartition{topic='TopicToConsume', partition=5}, KafkaTopicPartition{topic='TopicToConsume', partition=2}, KafkaTopicPartition{topic='TopicToConsume', partition=3}, KafkaTopicPartition{topic='TopicToConsume', partition=0}, KafkaTopicPartition{topic='TopicToConsume', partition=1}]
2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest



Any hints?



Re: all task managers reading from all kafka partitions

Posted by "r. r." <ro...@abv.bg>.
Gary, thanks a lot!
I completely forgot that parallelism extends over all slots visible to the JobManager!
So adding e.g. -p4 to 'flink run' approach should suit my use case just fine, I believe.
I'll look deeper into failure recovery with this scheme

Have a great weekend!
-Robert








 >-------- Оригинално писмо --------

 >От: Gary Yao gary@data-artisans.com

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <ro...@abv.bg>

 >Изпратено на: 18.11.2017 11:28



 
> 
 
>  
 
>  
 
>   
 
>    
 
>     
 
>      Hi Robert,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Running a single job does not mean that you are limited to a single JVM.
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      For example, a job with parallelism 4 by default requires 4 task slots to run.
 
>     
 
>     
 
>      You can provision 4 single slot TaskMangers on different hosts to connect to the
 
>     
 
>     
 
>      same JobManager. The JobManager can then take your job and distribute the
 
>     
 
>     
 
>      execution on the 4 slots. To learn more about the distributed runtime
 
>     
 
>     
 
>      environment:
 
>     
 
>     
 
>      
 
>     
 
>     
 
>        
 
>      https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Regarding your concerns about job failures, a failure in the JobManager or one
 
>     
 
>     
 
>      of the TaskManagers can bring your job down but Flink has built-in
 
>     
 
>     
 
>      fault-tolerance on different levels. You may want to read up on the following
 
>     
 
>     
 
>      topics:
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      - Data Streaming Fault Tolerance: 
 
>      https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
 
>     
 
>     
 
>      - Restart Strategies: 
 
>      https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html
 
>     
 
>     
 
>      - JobManager High Availability: 
 
>      https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Let me know if you have further questions.
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Best,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Gary
 
>     
 
>    
 
>    
 
>     
 
>     
 
>      On Fri, Nov 17, 2017 at 11:11 PM, r. r. 
 
>      <ro...@abv.bg> wrote:
 
>      
 
>      
 
>       Hmm, but I want single slot task managers and multiple jobs so that if one job fails it doesn't bring the whole setup (for example 30+ parallel consumers) down.
 
>        What setup would you advise? The job is quite heavy and might bring the VM down if run with such concurency in one JVM.
 
>        
 
>        Thanks!
 
>        
 
>               >-------- Оригинално писмо --------   >От: Gary Yao gary@data-artisans.com   >Относно: Re: all task managers reading from all kafka partitions   >До: "r. r." <ro...@abv.bg>   >Изпратено на: 17.11.2017 22:58
 
>        
 
>       
 
>        
 
>          
 
>          
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >    Forgot to hit "reply all" in my last email.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao
 
>          
 
>          >      <
 
>         gary@data-artisans.com> wrote:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >        Hi Robert,
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         To get your desired behavior, you should start a single job with parallelism set to 4.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Flink does not rely on Kafka's consumer groups to distribute the partitions to the parallel subtasks.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Instead, Flink does the assignment of partitions itself and also tracks and checkpoints the offsets internally.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         This is needed to achieve exactly-once semantics.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         The
 
>          
 
>          >         
 
>         group.id that you are setting is used for different purposes, e.g., to track the consumer lag of a job.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Best,
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >         Gary
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >           On Fri, Nov 17, 2017 at 7:54 PM, r. r.
 
>          
 
>          >           <
 
>         robert@abv.bg> wrote:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >            Hi    it's Flink 1.3.2, Kafka 0.10.2.0  I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run -p1 x.jar), job parallelism is set to 1.    A new thing I just noticed: if I start in parallel to the Flink jobs two  kafka-console-consumer (with --consumer-property 
 
>         group.id=TopicConsumers) and write a msg to Kafka, then one of the console consumers receives the msg together with both Flink jobs.  I though maybe the Flink consumers didn't receive the group property passed via "flink run .. --
 
>         group.id TopicConsumers", but no - they do belong to the group as well:    taskmanager_3  | 2017-11-17 18:29:00,750 INFO 
 
>          
 
>          >             org.apache.kafka.clients.consumer.ConsumerConfig              -
 
>          
 
>          >             ConsumerConfig values:
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         auto.commit.interval.ms = 5000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     auto.offset.reset = latest
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     bootstrap.servers = [kafka:9092]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     check.crcs = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         client.id =
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         connections.max.idle.ms = 540000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     enable.auto.commit = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     exclude.internal.topics = true
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     fetch.max.bytes = 52428800
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         fetch.max.wait.ms = 500
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     fetch.min.bytes = 1
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         group.id = TopicConsumers
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         heartbeat.interval.ms = 3000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     interceptor.classes = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     max.partition.fetch.bytes = 1048576
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         max.poll.interval.ms = 300000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     max.poll.records = 500
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         metadata.max.age.ms = 300000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metric.reporters = []
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metrics.num.samples = 2
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     metrics.recording.level = INFO
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         metrics.sample.window.ms = 30000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     receive.buffer.bytes = 65536
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         reconnect.backoff.ms = 50
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         request.timeout.ms = 305000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         retry.backoff.ms = 100
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.jaas.config = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         sasl.kerberos.service.name = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     sasl.mechanism = GSSAPI
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     security.protocol = PLAINTEXT
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     send.buffer.bytes = 131072
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              
 
>         session.timeout.ms = 10000
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.cipher.suites = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |    
 
>          
 
>          >              ssl.endpoint.identification.algorithm = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.key.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keymanager.algorithm = SunX509
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.location = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.keystore.type = JKS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.protocol = TLS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.provider = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.secure.random.implementation = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.location = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.password = null
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     ssl.truststore.type = JKS
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  |
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,765 WARN 
 
>          
 
>          >               org.apache.kafka.clients.consumer.ConsumerConfig              - The
 
>          
 
>          >               configuration 'topic' was supplied but isn't a known config.
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,765 INFO 
 
>          
 
>          >               org.apache.kafka.common.utils.AppInfoParser                   - Kafka
 
>          
 
>          >               version : 0.10.2.1
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,770 INFO 
 
>          
 
>          >               org.apache.kafka.common.utils.AppInfoParser                   - Kafka
 
>          
 
>          >               commitId : e89bffd6b2eff799
 
>          
 
>          >
 
>          
 
>          >               taskmanager_3  | 2017-11-17 18:29:00,791 INFO 
 
>          
 
>          >               org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
 
>          
 
>          >               Discovered coordinator kafka:9092 (id:
 
>          
 
>          >              2147482646 rack: null) for group
 
>          
 
>          >               TopicConsumers.
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               I'm running Kafka and Flink jobs in docker containers, the console-consumers from localhost
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >                >-------- Оригинално писмо --------
 
>          
 
>          >
 
>          
 
>          >                >От: Gary Yao
 
>          
 
>          >              
 
>         gary@data-artisans.com
 
>          
 
>          >
 
>          
 
>          >                >Относно: Re: all task managers reading from all kafka partitions
 
>          
 
>          >
 
>          
 
>          >                >До: "r. r." <
 
>          
 
>          >              
 
>         robert@abv.bg>
 
>          
 
>          >
 
>          
 
>          >                >Изпратено на: 17.11.2017 20:02
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >              >      Hi Robert,
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Can you tell us which Flink version you are using? 
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Also, are you starting a single job with parallelism 4 or are you starting several jobs?
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Thanks!
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      Gary
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
 
>          
 
>          >
 
>          
 
>          >               >      <
 
>          
 
>          >              
 
>         robert@abv.bg> wrote:
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >       Hi
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        I have this strange problem: 4 task managers each with one task slot, attaching to the same Kafka topic which has 10 partitions.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        When I post a single message to the Kafka topic it seems that all 4 consumers fetch the message and start processing (confirmed by TM logs).
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says that only one message was posted to a single partition. Next message would generally go to another partition.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        In addition, while the Flink jobs are running on the message, I start two kafka-console-consumer.sh and each would get only one message, as expected.
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        On start each of the Flink TM would post something that to me reads as if it would read from all partitions:
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these topics: [TopicToConsume]
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the following topics (with number of partitions): TopicToConsume (10),
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 10 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, KafkaTopicPartition{topic='TopicToConsume', partition=6}, KafkaTopicPartition{topic='TopicToConsume', partition=7}, KafkaTopicPartition{topic='TopicToConsume', partition=4}, KafkaTopicPartition{topic='TopicToConsume', partition=5}, KafkaTopicPartition{topic='TopicToConsume', partition=2}, KafkaTopicPartition{topic='TopicToConsume', partition=3}, KafkaTopicPartition{topic='TopicToConsume', partition=0}, KafkaTopicPartition{topic='TopicToConsume', partition=1}]
 
>          
 
>          >
 
>          
 
>          >               >        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
 
>          
 
>          >
 
>          
 
>          >               >               
 
>          
 
>          >
 
>          
 
>          >               >       
 
>          
 
>          >              
 
>         auto.commit.interval.ms = 5000
 
>          
 
>          >
 
>          
 
>          >               >                auto.offset.reset = latest
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >        Any hints?
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >               >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>          >
 
>          
 
>        
 
>       
 
>      
 
>     
 
>     
 
>    
 
>    
 
>  

Re: all task managers reading from all kafka partitions

Posted by Gary Yao <ga...@data-artisans.com>.
Hi Robert,

Running a single job does not mean that you are limited to a single JVM.

For example, a job with parallelism 4 by default requires 4 task slots to
run.
You can provision 4 single slot TaskMangers on different hosts to connect
to the
same JobManager. The JobManager can then take your job and distribute the
execution on the 4 slots. To learn more about the distributed runtime
environment:


https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html

Regarding your concerns about job failures, a failure in the JobManager or
one
of the TaskManagers can bring your job down but Flink has built-in
fault-tolerance on different levels. You may want to read up on the
following
topics:

- Data Streaming Fault Tolerance:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
- Restart Strategies:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html
- JobManager High Availability:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html

Let me know if you have further questions.

Best,

Gary

On Fri, Nov 17, 2017 at 11:11 PM, r. r. <ro...@abv.bg> wrote:

> Hmm, but I want single slot task managers and multiple jobs so that if one
> job fails it doesn't bring the whole setup (for example 30+ parallel
> consumers) down.
> What setup would you advise? The job is quite heavy and might bring the VM
> down if run with such concurency in one JVM.
>
> Thanks!
>
>
>
>
>
>
>
>  >-------- Оригинално писмо --------
>
>  >От: Gary Yao gary@data-artisans.com
>
>  >Относно: Re: all task managers reading from all kafka partitions
>
>  >До: "r. r." <ro...@abv.bg>
>
>  >Изпратено на: 17.11.2017 22:58
>
>
>
>
> >
>
> >
>
> >
>
> >
>
> >    Forgot to hit "reply all" in my last email.
>
> >
>
> >
>
> >
>
> >
>
> >      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao
>
> >      <ga...@data-artisans.com> wrote:
>
> >
>
> >
>
> >
>
> >        Hi Robert,
>
> >
>
> >
>
> >
>
> >
>
> >         To get your desired behavior, you should start a single job with
> parallelism set to 4.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Flink does not rely on Kafka's consumer groups to distribute the
> partitions to the parallel subtasks.
>
> >
>
> >
>
> >         Instead, Flink does the assignment of partitions itself and also
> tracks and checkpoints the offsets internally.
>
> >
>
> >
>
> >         This is needed to achieve exactly-once semantics.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         The
>
> >         group.id that you are setting is used for different purposes,
> e.g., to track the consumer lag of a job.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Best,
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Gary
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >           On Fri, Nov 17, 2017 at 7:54 PM, r. r.
>
> >           <ro...@abv.bg> wrote:
>
> >
>
> >
>
> >            Hi    it's Flink 1.3.2, Kafka 0.10.2.0  I am starting 1 JM
> and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run
> -p1 x.jar), job parallelism is set to 1.    A new thing I just noticed: if
> I start in parallel to the Flink jobs two  kafka-console-consumer (with
> --consumer-property group.id=TopicConsumers) and write a msg to Kafka,
> then one of the console consumers receives the msg together with both Flink
> jobs.  I though maybe the Flink consumers didn't receive the group property
> passed via "flink run .. --group.id TopicConsumers", but no - they do
> belong to the group as well:    taskmanager_3  | 2017-11-17 18:29:00,750
> INFO
>
> >             org.apache.kafka.clients.consumer.ConsumerConfig
> -
>
> >             ConsumerConfig values:
>
> >
>
> >
>
> >
>
> >               taskmanager_3  |
>
> >              auto.commit.interval.ms = 5000
>
> >
>
> >               taskmanager_3  |     auto.offset.reset = latest
>
> >
>
> >               taskmanager_3  |     bootstrap.servers = [kafka:9092]
>
> >
>
> >               taskmanager_3  |     check.crcs = true
>
> >
>
> >               taskmanager_3  |
>
> >              client.id =
>
> >
>
> >               taskmanager_3  |
>
> >              connections.max.idle.ms = 540000
>
> >
>
> >               taskmanager_3  |     enable.auto.commit = true
>
> >
>
> >               taskmanager_3  |     exclude.internal.topics = true
>
> >
>
> >               taskmanager_3  |     fetch.max.bytes = 52428800
>
> >
>
> >               taskmanager_3  |
>
> >              fetch.max.wait.ms = 500
>
> >
>
> >               taskmanager_3  |     fetch.min.bytes = 1
>
> >
>
> >               taskmanager_3  |
>
> >              group.id = TopicConsumers
>
> >
>
> >               taskmanager_3  |
>
> >              heartbeat.interval.ms = 3000
>
> >
>
> >               taskmanager_3  |     interceptor.classes = null
>
> >
>
> >               taskmanager_3  |     key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> >
>
> >               taskmanager_3  |     max.partition.fetch.bytes = 1048576
>
> >
>
> >               taskmanager_3  |
>
> >              max.poll.interval.ms = 300000
>
> >
>
> >               taskmanager_3  |     max.poll.records = 500
>
> >
>
> >               taskmanager_3  |
>
> >              metadata.max.age.ms = 300000
>
> >
>
> >               taskmanager_3  |     metric.reporters = []
>
> >
>
> >               taskmanager_3  |     metrics.num.samples = 2
>
> >
>
> >               taskmanager_3  |     metrics.recording.level = INFO
>
> >
>
> >               taskmanager_3  |
>
> >              metrics.sample.window.ms = 30000
>
> >
>
> >               taskmanager_3  |     partition.assignment.strategy =
> [class org.apache.kafka.clients.consumer.RangeAssignor]
>
> >
>
> >               taskmanager_3  |     receive.buffer.bytes = 65536
>
> >
>
> >               taskmanager_3  |
>
> >              reconnect.backoff.ms = 50
>
> >
>
> >               taskmanager_3  |
>
> >              request.timeout.ms = 305000
>
> >
>
> >               taskmanager_3  |
>
> >              retry.backoff.ms = 100
>
> >
>
> >               taskmanager_3  |     sasl.jaas.config = null
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.kinit.cmd =
> /usr/bin/kinit
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.min.time.before.relogin
> = 60000
>
> >
>
> >               taskmanager_3  |
>
> >              sasl.kerberos.service.name = null
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.ticket.renew.jitter =
> 0.05
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor
> = 0.8
>
> >
>
> >               taskmanager_3  |     sasl.mechanism = GSSAPI
>
> >
>
> >               taskmanager_3  |     security.protocol = PLAINTEXT
>
> >
>
> >               taskmanager_3  |     send.buffer.bytes = 131072
>
> >
>
> >               taskmanager_3  |
>
> >              session.timeout.ms = 10000
>
> >
>
> >               taskmanager_3  |     ssl.cipher.suites = null
>
> >
>
> >               taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2,
> TLSv1.1, TLSv1]
>
> >
>
> >               taskmanager_3  |
>
> >              ssl.endpoint.identification.algorithm = null
>
> >
>
> >               taskmanager_3  |     ssl.key.password = null
>
> >
>
> >               taskmanager_3  |     ssl.keymanager.algorithm = SunX509
>
> >
>
> >               taskmanager_3  |     ssl.keystore.location = null
>
> >
>
> >               taskmanager_3  |     ssl.keystore.password = null
>
> >
>
> >               taskmanager_3  |     ssl.keystore.type = JKS
>
> >
>
> >               taskmanager_3  |     ssl.protocol = TLS
>
> >
>
> >               taskmanager_3  |     ssl.provider = null
>
> >
>
> >               taskmanager_3  |     ssl.secure.random.implementation =
> null
>
> >
>
> >               taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
>
> >
>
> >               taskmanager_3  |     ssl.truststore.location = null
>
> >
>
> >               taskmanager_3  |     ssl.truststore.password = null
>
> >
>
> >               taskmanager_3  |     ssl.truststore.type = JKS
>
> >
>
> >               taskmanager_3  |     value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> >
>
> >               taskmanager_3  |
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,765 WARN
>
> >               org.apache.kafka.clients.consumer.ConsumerConfig
> - The
>
> >               configuration 'topic' was supplied but isn't a known
> config.
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,765 INFO
>
> >               org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka
>
> >               version : 0.10.2.1
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,770 INFO
>
> >               org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka
>
> >               commitId : e89bffd6b2eff799
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,791 INFO
>
> >               org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> -
>
> >               Discovered coordinator kafka:9092 (id:
>
> >              2147482646 rack: null) for group
>
> >               TopicConsumers.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >               I'm running Kafka and Flink jobs in docker containers, the
> console-consumers from localhost
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >                >-------- Оригинално писмо --------
>
> >
>
> >                >От: Gary Yao
>
> >              gary@data-artisans.com
>
> >
>
> >                >Относно: Re: all task managers reading from all kafka
> partitions
>
> >
>
> >                >До: "r. r." <
>
> >              robert@abv.bg>
>
> >
>
> >                >Изпратено на: 17.11.2017 20:02
>
> >
>
> >
>
> >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >              >      Hi Robert,
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Can you tell us which Flink version you are using?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Also, are you starting a single job with
> parallelism 4 or are you starting several jobs?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Thanks!
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Gary
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
>
> >
>
> >               >      <
>
> >              robert@abv.bg> wrote:
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >       Hi
>
> >
>
> >               >
>
> >
>
> >               >        I have this strange problem: 4 task managers each
> with one task slot, attaching to the same Kafka topic which has 10
> partitions.
>
> >
>
> >               >
>
> >
>
> >               >        When I post a single message to the Kafka topic
> it seems that all 4 consumers fetch the message and start processing
> (confirmed by TM logs).
>
> >
>
> >               >
>
> >
>
> >               >        If I run kafka-consumer-groups.sh  --describe
> --group TopicConsumers it says that only one message was posted to a single
> partition. Next message would generally go to another partition.
>
> >
>
> >               >
>
> >
>
> >               >        In addition, while the Flink jobs are running on
> the message, I start two kafka-console-consumer.sh and each would get only
> one message, as expected.
>
> >
>
> >               >
>
> >
>
> >               >        On start each of the Flink TM would post
> something that to me reads as if it would read from all partitions:
>
> >
>
> >               >
>
> >
>
> >               >        2017-11-17 15:03:38,688 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got
> 10 partitions from these topics: [TopicToConsume]
>
> >
>
> >               >        2017-11-17 15:03:38,689 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  -
> Consumer is going to read the following topics (with number of partitions):
> TopicToConsume (10),
>
> >
>
> >               >        2017-11-17 15:03:38,689 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Consumer subtask 0 will start reading the following 10 partitions from the
> committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume',
> partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9},
> KafkaTopicPartition{topic='TopicToConsume', partition=6},
> KafkaTopicPartition{topic='TopicToConsume', partition=7},
> KafkaTopicPartition{topic='TopicToConsume', partition=4},
> KafkaTopicPartition{topic='TopicToConsume', partition=5},
> KafkaTopicPartition{topic='TopicToConsume', partition=2},
> KafkaTopicPartition{topic='TopicToConsume', partition=3},
> KafkaTopicPartition{topic='TopicToConsume', partition=0},
> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
>
> >
>
> >               >        2017-11-17 15:03:38,699 INFO
> org.apache.kafka.clients.consumer.ConsumerConfig              -
> ConsumerConfig values:
>
> >
>
> >               >
>
> >
>
> >               >
>
> >              auto.commit.interval.ms = 5000
>
> >
>
> >               >                auto.offset.reset = latest
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >        Any hints?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>

Re: all task managers reading from all kafka partitions

Posted by "r. r." <ro...@abv.bg>.
Hmm, but I want single slot task managers and multiple jobs so that if one job fails it doesn't bring the whole setup (for example 30+ parallel consumers) down.
What setup would you advise? The job is quite heavy and might bring the VM down if run with such concurency in one JVM.

Thanks!







 >-------- Оригинално писмо --------

 >От: Gary Yao gary@data-artisans.com

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <ro...@abv.bg>

 >Изпратено на: 17.11.2017 22:58



 
> 
 
>  
 
>  
 
>   
 
>    Forgot to hit "reply all" in my last email.
 
>    
 
>    
 
>     
 
>     
 
>      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao 
 
>      <ga...@data-artisans.com> wrote:
 
>      
 
>      
 
>       
 
>        Hi Robert,
 
>        
 
>         
 
>        
 
>        
 
>         To get your desired behavior, you should start a single job with parallelism set to 4.
 
>        
 
>        
 
>         
 
>        
 
>        
 
>         Flink does not rely on Kafka's consumer groups to distribute the partitions to the parallel subtasks.
 
>        
 
>        
 
>         Instead, Flink does the assignment of partitions itself and also tracks and checkpoints the offsets internally.
 
>        
 
>        
 
>         This is needed to achieve exactly-once semantics.
 
>        
 
>        
 
>         
 
>        
 
>        
 
>         The 
 
>         group.id that you are setting is used for different purposes, e.g., to track the consumer lag of a job.
 
>        
 
>        
 
>         
 
>        
 
>        
 
>         Best,
 
>        
 
>        
 
>         
 
>        
 
>        
 
>         Gary
 
>        
 
>       
 
>       
 
>        
 
>         
 
>          
 
>          
 
>           On Fri, Nov 17, 2017 at 7:54 PM, r. r. 
 
>           <ro...@abv.bg> wrote:
 
>           
 
>           
 
>            Hi    it's Flink 1.3.2, Kafka 0.10.2.0  I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run -p1 x.jar), job parallelism is set to 1.    A new thing I just noticed: if I start in parallel to the Flink jobs two  kafka-console-consumer (with --consumer-property group.id=TopicConsumers) and write a msg to Kafka, then one of the console consumers receives the msg together with both Flink jobs.  I though maybe the Flink consumers didn't receive the group property passed via "flink run .. --group.id TopicConsumers", but no - they do belong to the group as well:    taskmanager_3  | 2017-11-17 18:29:00,750 INFO 
 
>             org.apache.kafka.clients.consumer.ConsumerConfig              -
 
>             ConsumerConfig values:
 
>             
 
>            
 
>             
 
>               taskmanager_3  |     
 
>              auto.commit.interval.ms = 5000
 
>               
 
>               taskmanager_3  |     auto.offset.reset = latest
 
>               
 
>               taskmanager_3  |     bootstrap.servers = [kafka:9092]
 
>               
 
>               taskmanager_3  |     check.crcs = true
 
>               
 
>               taskmanager_3  |     
 
>              client.id =
 
>               
 
>               taskmanager_3  |     
 
>              connections.max.idle.ms = 540000
 
>               
 
>               taskmanager_3  |     enable.auto.commit = true
 
>               
 
>               taskmanager_3  |     exclude.internal.topics = true
 
>               
 
>               taskmanager_3  |     fetch.max.bytes = 52428800
 
>               
 
>               taskmanager_3  |     
 
>              fetch.max.wait.ms = 500
 
>               
 
>               taskmanager_3  |     fetch.min.bytes = 1
 
>               
 
>               taskmanager_3  |     
 
>              group.id = TopicConsumers
 
>               
 
>               taskmanager_3  |     
 
>              heartbeat.interval.ms = 3000
 
>               
 
>               taskmanager_3  |     interceptor.classes = null
 
>               
 
>               taskmanager_3  |     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>               
 
>               taskmanager_3  |     max.partition.fetch.bytes = 1048576
 
>               
 
>               taskmanager_3  |     
 
>              max.poll.interval.ms = 300000
 
>               
 
>               taskmanager_3  |     max.poll.records = 500
 
>               
 
>               taskmanager_3  |     
 
>              metadata.max.age.ms = 300000
 
>               
 
>               taskmanager_3  |     metric.reporters = []
 
>               
 
>               taskmanager_3  |     metrics.num.samples = 2
 
>               
 
>               taskmanager_3  |     metrics.recording.level = INFO
 
>               
 
>               taskmanager_3  |     
 
>              metrics.sample.window.ms = 30000
 
>               
 
>               taskmanager_3  |     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
 
>               
 
>               taskmanager_3  |     receive.buffer.bytes = 65536
 
>               
 
>               taskmanager_3  |     
 
>              reconnect.backoff.ms = 50
 
>               
 
>               taskmanager_3  |     
 
>              request.timeout.ms = 305000
 
>               
 
>               taskmanager_3  |     
 
>              retry.backoff.ms = 100
 
>               
 
>               taskmanager_3  |     sasl.jaas.config = null
 
>               
 
>               taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
 
>               
 
>               taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000
 
>               
 
>               taskmanager_3  |     
 
>              sasl.kerberos.service.name = null
 
>               
 
>               taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05
 
>               
 
>               taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8
 
>               
 
>               taskmanager_3  |     sasl.mechanism = GSSAPI
 
>               
 
>               taskmanager_3  |     security.protocol = PLAINTEXT
 
>               
 
>               taskmanager_3  |     send.buffer.bytes = 131072
 
>               
 
>               taskmanager_3  |     
 
>              session.timeout.ms = 10000
 
>               
 
>               taskmanager_3  |     ssl.cipher.suites = null
 
>               
 
>               taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 
>               
 
>               taskmanager_3  |     
 
>              ssl.endpoint.identification.algorithm = null
 
>               
 
>               taskmanager_3  |     ssl.key.password = null
 
>               
 
>               taskmanager_3  |     ssl.keymanager.algorithm = SunX509
 
>               
 
>               taskmanager_3  |     ssl.keystore.location = null
 
>               
 
>               taskmanager_3  |     ssl.keystore.password = null
 
>               
 
>               taskmanager_3  |     ssl.keystore.type = JKS
 
>               
 
>               taskmanager_3  |     ssl.protocol = TLS
 
>               
 
>               taskmanager_3  |     ssl.provider = null
 
>               
 
>               taskmanager_3  |     ssl.secure.random.implementation = null
 
>               
 
>               taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
 
>               
 
>               taskmanager_3  |     ssl.truststore.location = null
 
>               
 
>               taskmanager_3  |     ssl.truststore.password = null
 
>               
 
>               taskmanager_3  |     ssl.truststore.type = JKS
 
>               
 
>               taskmanager_3  |     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
>               
 
>               taskmanager_3  |
 
>               
 
>               taskmanager_3  | 2017-11-17 18:29:00,765 WARN 
 
>               org.apache.kafka.clients.consumer.ConsumerConfig              - The
 
>               configuration 'topic' was supplied but isn't a known config.
 
>               
 
>               taskmanager_3  | 2017-11-17 18:29:00,765 INFO 
 
>               org.apache.kafka.common.utils.AppInfoParser                   - Kafka
 
>               version : 0.10.2.1
 
>               
 
>               taskmanager_3  | 2017-11-17 18:29:00,770 INFO 
 
>               org.apache.kafka.common.utils.AppInfoParser                   - Kafka
 
>               commitId : e89bffd6b2eff799
 
>               
 
>               taskmanager_3  | 2017-11-17 18:29:00,791 INFO 
 
>               org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
 
>               Discovered coordinator kafka:9092 (id: 
 
>              2147482646 rack: null) for group
 
>               TopicConsumers.
 
>               
 
>               
 
>               
 
>               
 
>               
 
>               I'm running Kafka and Flink jobs in docker containers, the console-consumers from localhost
 
>               
 
>               
 
>               
 
>               
 
>               
 
>               
 
>               
 
>                >-------- Оригинално писмо --------
 
>               
 
>                >От: Gary Yao 
 
>              gary@data-artisans.com
 
>               
 
>                >Относно: Re: all task managers reading from all kafka partitions
 
>               
 
>                >До: "r. r." <
 
>              robert@abv.bg>
 
>               
 
>                >Изпратено на: 17.11.2017 20:02
 
>               
 
>               
 
>               
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               
 
>             
 
>            
 
>            
 
>             
 
>              >      Hi Robert,
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >      Can you tell us which Flink version you are using? 
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >      Also, are you starting a single job with parallelism 4 or are you starting several jobs?
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >      Thanks!
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >      Gary
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
 
>               
 
>               >      <
 
>              robert@abv.bg> wrote:
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >       Hi
 
>               
 
>               >
 
>               
 
>               >        I have this strange problem: 4 task managers each with one task slot, attaching to the same Kafka topic which has 10 partitions.
 
>               
 
>               >
 
>               
 
>               >        When I post a single message to the Kafka topic it seems that all 4 consumers fetch the message and start processing (confirmed by TM logs).
 
>               
 
>               >
 
>               
 
>               >        If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says that only one message was posted to a single partition. Next message would generally go to another partition.
 
>               
 
>               >
 
>               
 
>               >        In addition, while the Flink jobs are running on the message, I start two kafka-console-consumer.sh and each would get only one message, as expected.
 
>               
 
>               >
 
>               
 
>               >        On start each of the Flink TM would post something that to me reads as if it would read from all partitions:
 
>               
 
>               >
 
>               
 
>               >        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these topics: [TopicToConsume]
 
>               
 
>               >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the following topics (with number of partitions): TopicToConsume (10),
 
>               
 
>               >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 10 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, KafkaTopicPartition{topic='TopicToConsume', partition=6}, KafkaTopicPartition{topic='TopicToConsume', partition=7}, KafkaTopicPartition{topic='TopicToConsume', partition=4}, KafkaTopicPartition{topic='TopicToConsume', partition=5}, KafkaTopicPartition{topic='TopicToConsume', partition=2}, KafkaTopicPartition{topic='TopicToConsume', partition=3}, KafkaTopicPartition{topic='TopicToConsume', partition=0}, KafkaTopicPartition{topic='TopicToConsume', partition=1}]
 
>               
 
>               >        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
 
>               
 
>               >               
 
>               
 
>               >       
 
>              auto.commit.interval.ms = 5000
 
>               
 
>               >                auto.offset.reset = latest
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >        Any hints?
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>               >
 
>               
 
>             
 
>            
 
>           
 
>          
 
>          
 
>          
 
>        
 
>       
 
>      
 
>     
 
>     
 
>    
 
>    
 
>  

Re: all task managers reading from all kafka partitions

Posted by Gary Yao <ga...@data-artisans.com>.
Forgot to hit "reply all" in my last email.

On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao <ga...@data-artisans.com> wrote:

> Hi Robert,
>
> To get your desired behavior, you should start a single job with
> parallelism set to 4.
>
> Flink does not rely on Kafka's consumer groups to distribute the
> partitions to the parallel subtasks.
> Instead, Flink does the assignment of partitions itself and also tracks
> and checkpoints the offsets internally.
> This is needed to achieve exactly-once semantics.
>
> The group.id that you are setting is used for different purposes, e.g.,
> to track the consumer lag of a job.
>
> Best,
>
> Gary
>
> On Fri, Nov 17, 2017 at 7:54 PM, r. r. <ro...@abv.bg> wrote:
>
>> Hi
>>
>>
>>
>> it's Flink 1.3.2, Kafka 0.10.2.0
>>
>> I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4
>> times (via ./flink run -p1 x.jar), job parallelism is set to 1.
>>
>>
>>
>> A new thing I just noticed: if I start in parallel to the Flink jobs two
>>  kafka-console-consumer (with --consumer-property
>> group.id=TopicConsumers) and write a msg to Kafka, then one of the
>> console consumers receives the msg together with both Flink jobs.
>>
>> I though maybe the Flink consumers didn't receive the group property
>> passed via "flink run .. --group.id TopicConsumers", but no - they do
>> belong to the group as well:
>>
>>
>>
>> taskmanager_3  | 2017-11-17 18:29:00,750 INFO
>> org.apache.kafka.clients.consumer.ConsumerConfig              -
>> ConsumerConfig values:
>>
>> taskmanager_3  |     auto.commit.interval.ms = 5000
>>
>> taskmanager_3  |     auto.offset.reset = latest
>>
>> taskmanager_3  |     bootstrap.servers = [kafka:9092]
>>
>> taskmanager_3  |     check.crcs = true
>>
>> taskmanager_3  |     client.id =
>>
>> taskmanager_3  |     connections.max.idle.ms = 540000
>>
>> taskmanager_3  |     enable.auto.commit = true
>>
>> taskmanager_3  |     exclude.internal.topics = true
>>
>> taskmanager_3  |     fetch.max.bytes = 52428800
>>
>> taskmanager_3  |     fetch.max.wait.ms = 500
>>
>> taskmanager_3  |     fetch.min.bytes = 1
>>
>> taskmanager_3  |     group.id = TopicConsumers
>>
>> taskmanager_3  |     heartbeat.interval.ms = 3000
>>
>> taskmanager_3  |     interceptor.classes = null
>>
>> taskmanager_3  |     key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> taskmanager_3  |     max.partition.fetch.bytes = 1048576
>>
>> taskmanager_3  |     max.poll.interval.ms = 300000
>>
>> taskmanager_3  |     max.poll.records = 500
>>
>> taskmanager_3  |     metadata.max.age.ms = 300000
>>
>> taskmanager_3  |     metric.reporters = []
>>
>> taskmanager_3  |     metrics.num.samples = 2
>>
>> taskmanager_3  |     metrics.recording.level = INFO
>>
>> taskmanager_3  |     metrics.sample.window.ms = 30000
>>
>> taskmanager_3  |     partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>>
>> taskmanager_3  |     receive.buffer.bytes = 65536
>>
>> taskmanager_3  |     reconnect.backoff.ms = 50
>>
>> taskmanager_3  |     request.timeout.ms = 305000
>>
>> taskmanager_3  |     retry.backoff.ms = 100
>>
>> taskmanager_3  |     sasl.jaas.config = null
>>
>> taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>
>> taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000
>>
>> taskmanager_3  |     sasl.kerberos.service.name = null
>>
>> taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05
>>
>> taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8
>>
>> taskmanager_3  |     sasl.mechanism = GSSAPI
>>
>> taskmanager_3  |     security.protocol = PLAINTEXT
>>
>> taskmanager_3  |     send.buffer.bytes = 131072
>>
>> taskmanager_3  |     session.timeout.ms = 10000
>>
>> taskmanager_3  |     ssl.cipher.suites = null
>>
>> taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>
>> taskmanager_3  |     ssl.endpoint.identification.algorithm = null
>>
>> taskmanager_3  |     ssl.key.password = null
>>
>> taskmanager_3  |     ssl.keymanager.algorithm = SunX509
>>
>> taskmanager_3  |     ssl.keystore.location = null
>>
>> taskmanager_3  |     ssl.keystore.password = null
>>
>> taskmanager_3  |     ssl.keystore.type = JKS
>>
>> taskmanager_3  |     ssl.protocol = TLS
>>
>> taskmanager_3  |     ssl.provider = null
>>
>> taskmanager_3  |     ssl.secure.random.implementation = null
>>
>> taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
>>
>> taskmanager_3  |     ssl.truststore.location = null
>>
>> taskmanager_3  |     ssl.truststore.password = null
>>
>> taskmanager_3  |     ssl.truststore.type = JKS
>>
>> taskmanager_3  |     value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> taskmanager_3  |
>>
>> taskmanager_3  | 2017-11-17 18:29:00,765 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig              - The
>> configuration 'topic' was supplied but isn't a known config.
>>
>> taskmanager_3  | 2017-11-17 18:29:00,765 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> version : 0.10.2.1
>>
>> taskmanager_3  | 2017-11-17 18:29:00,770 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> commitId : e89bffd6b2eff799
>>
>> taskmanager_3  | 2017-11-17 18:29:00,791 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
>> Discovered coordinator kafka:9092 (id: 2147482646 <(214)%20748-2646>
>> rack: null) for group
>> TopicConsumers.
>>
>>
>>
>>
>>
>> I'm running Kafka and Flink jobs in docker containers, the
>> console-consumers from localhost
>>
>>
>>
>>
>>
>>
>>
>>  >-------- Оригинално писмо --------
>>
>>  >От: Gary Yao gary@data-artisans.com
>>
>>  >Относно: Re: all task managers reading from all kafka partitions
>>
>>  >До: "r. r." <ro...@abv.bg>
>>
>>  >Изпратено на: 17.11.2017 20:02
>>
>>
>>
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Hi Robert,
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Can you tell us which Flink version you are using?
>>
>> >
>>
>> >
>>
>> >      Also, are you starting a single job with parallelism 4 or are you
>> starting several jobs?
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Thanks!
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      Gary
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
>>
>> >      <ro...@abv.bg> wrote:
>>
>> >
>>
>> >
>>
>> >       Hi
>>
>> >
>>
>> >        I have this strange problem: 4 task managers each with one task
>> slot, attaching to the same Kafka topic which has 10 partitions.
>>
>> >
>>
>> >        When I post a single message to the Kafka topic it seems that
>> all 4 consumers fetch the message and start processing (confirmed by TM
>> logs).
>>
>> >
>>
>> >        If I run kafka-consumer-groups.sh  --describe --group
>> TopicConsumers it says that only one message was posted to a single
>> partition. Next message would generally go to another partition.
>>
>> >
>>
>> >        In addition, while the Flink jobs are running on the message, I
>> start two kafka-console-consumer.sh and each would get only one message, as
>> expected.
>>
>> >
>>
>> >        On start each of the Flink TM would post something that to me
>> reads as if it would read from all partitions:
>>
>> >
>>
>> >        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these
>> topics: [TopicToConsume]
>>
>> >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the
>> following topics (with number of partitions): TopicToConsume (10),
>>
>> >        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start
>> reading the following 10 partitions from the committed group offsets in
>> Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8},
>> KafkaTopicPartition{topic='TopicToConsume', partition=9},
>> KafkaTopicPartition{topic='TopicToConsume', partition=6},
>> KafkaTopicPartition{topic='TopicToConsume', partition=7},
>> KafkaTopicPartition{topic='TopicToConsume', partition=4},
>> KafkaTopicPartition{topic='TopicToConsume', partition=5},
>> KafkaTopicPartition{topic='TopicToConsume', partition=2},
>> KafkaTopicPartition{topic='TopicToConsume', partition=3},
>> KafkaTopicPartition{topic='TopicToConsume', partition=0},
>> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
>>
>> >        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consu
>> mer.ConsumerConfig              - ConsumerConfig values:
>>
>> >
>>
>> >       auto.commit.interval.ms = 5000
>>
>> >                auto.offset.reset = latest
>>
>> >
>>
>> >
>>
>> >
>>
>> >        Any hints?
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>
>

Re: all task managers reading from all kafka partitions

Posted by "r. r." <ro...@abv.bg>.
Hi 



it's Flink 1.3.2, Kafka 0.10.2.0

I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 
times (via ./flink run -p1 x.jar), job parallelism is set to 1.



A new thing I just noticed: if I start in parallel to the Flink jobs two
 kafka-console-consumer (with --consumer-property 
group.id=TopicConsumers) and write a msg to Kafka, then one of the 
console consumers receives the msg together with both Flink jobs. 

I though maybe the Flink consumers didn't receive the group property 
passed via "flink run .. --group.id TopicConsumers", but no - they do 
belong to the group as well:



taskmanager_3  | 2017-11-17 18:29:00,750 INFO  
org.apache.kafka.clients.consumer.ConsumerConfig              - 
ConsumerConfig values: 

taskmanager_3  |     auto.commit.interval.ms = 5000

taskmanager_3  |     auto.offset.reset = latest

taskmanager_3  |     bootstrap.servers = [kafka:9092]

taskmanager_3  |     check.crcs = true

taskmanager_3  |     client.id = 

taskmanager_3  |     connections.max.idle.ms = 540000

taskmanager_3  |     enable.auto.commit = true

taskmanager_3  |     exclude.internal.topics = true

taskmanager_3  |     fetch.max.bytes = 52428800

taskmanager_3  |     fetch.max.wait.ms = 500

taskmanager_3  |     fetch.min.bytes = 1

taskmanager_3  |     group.id = TopicConsumers

taskmanager_3  |     heartbeat.interval.ms = 3000

taskmanager_3  |     interceptor.classes = null

taskmanager_3  |     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

taskmanager_3  |     max.partition.fetch.bytes = 1048576

taskmanager_3  |     max.poll.interval.ms = 300000

taskmanager_3  |     max.poll.records = 500

taskmanager_3  |     metadata.max.age.ms = 300000

taskmanager_3  |     metric.reporters = []

taskmanager_3  |     metrics.num.samples = 2

taskmanager_3  |     metrics.recording.level = INFO

taskmanager_3  |     metrics.sample.window.ms = 30000

taskmanager_3  |     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

taskmanager_3  |     receive.buffer.bytes = 65536

taskmanager_3  |     reconnect.backoff.ms = 50

taskmanager_3  |     request.timeout.ms = 305000

taskmanager_3  |     retry.backoff.ms = 100

taskmanager_3  |     sasl.jaas.config = null

taskmanager_3  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit

taskmanager_3  |     sasl.kerberos.min.time.before.relogin = 60000

taskmanager_3  |     sasl.kerberos.service.name = null

taskmanager_3  |     sasl.kerberos.ticket.renew.jitter = 0.05

taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor = 0.8

taskmanager_3  |     sasl.mechanism = GSSAPI

taskmanager_3  |     security.protocol = PLAINTEXT

taskmanager_3  |     send.buffer.bytes = 131072

taskmanager_3  |     session.timeout.ms = 10000

taskmanager_3  |     ssl.cipher.suites = null

taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

taskmanager_3  |     ssl.endpoint.identification.algorithm = null

taskmanager_3  |     ssl.key.password = null

taskmanager_3  |     ssl.keymanager.algorithm = SunX509

taskmanager_3  |     ssl.keystore.location = null

taskmanager_3  |     ssl.keystore.password = null

taskmanager_3  |     ssl.keystore.type = JKS

taskmanager_3  |     ssl.protocol = TLS

taskmanager_3  |     ssl.provider = null

taskmanager_3  |     ssl.secure.random.implementation = null

taskmanager_3  |     ssl.trustmanager.algorithm = PKIX

taskmanager_3  |     ssl.truststore.location = null

taskmanager_3  |     ssl.truststore.password = null

taskmanager_3  |     ssl.truststore.type = JKS

taskmanager_3  |     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

taskmanager_3  | 

taskmanager_3  | 2017-11-17 18:29:00,765 WARN  
org.apache.kafka.clients.consumer.ConsumerConfig              - The 
configuration 'topic' was supplied but isn't a known config.

taskmanager_3  | 2017-11-17 18:29:00,765 INFO  
org.apache.kafka.common.utils.AppInfoParser                   - Kafka 
version : 0.10.2.1

taskmanager_3  | 2017-11-17 18:29:00,770 INFO  
org.apache.kafka.common.utils.AppInfoParser                   - Kafka 
commitId : e89bffd6b2eff799

taskmanager_3  | 2017-11-17 18:29:00,791 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group 
TopicConsumers.





I'm running Kafka and Flink jobs in docker containers, the console-consumers from localhost







 >-------- Оригинално писмо --------

 >От: Gary Yao gary@data-artisans.com

 >Относно: Re: all task managers reading from all kafka partitions

 >До: "r. r." <ro...@abv.bg>

 >Изпратено на: 17.11.2017 20:02



 
> 
 
>  
 
>  
 
>   
 
>    
 
>     
 
>      Hi Robert,
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Can you tell us which Flink version you are using? 
 
>     
 
>     
 
>      Also, are you starting a single job with parallelism 4 or are you starting several jobs?
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Thanks!
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      Gary
 
>     
 
>     
 
>      
 
>     
 
>     
 
>      On Fri, Nov 17, 2017 at 4:41 PM, r. r. 
 
>      <ro...@abv.bg> wrote:
 
>      
 
>      
 
>       Hi
 
>        
 
>        I have this strange problem: 4 task managers each with one task slot, attaching to the same Kafka topic which has 10 partitions.
 
>        
 
>        When I post a single message to the Kafka topic it seems that all 4 consumers fetch the message and start processing (confirmed by TM logs).
 
>        
 
>        If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says that only one message was posted to a single partition. Next message would generally go to another partition.
 
>        
 
>        In addition, while the Flink jobs are running on the message, I start two kafka-console-consumer.sh and each would get only one message, as expected.
 
>        
 
>        On start each of the Flink TM would post something that to me reads as if it would read from all partitions:
 
>        
 
>        2017-11-17 15:03:38,688 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 partitions from these topics: [TopicToConsume]
 
>        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer is going to read the following topics (with number of partitions): TopicToConsume (10),
 
>        2017-11-17 15:03:38,689 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 10 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, KafkaTopicPartition{topic='TopicToConsume', partition=6}, KafkaTopicPartition{topic='TopicToConsume', partition=7}, KafkaTopicPartition{topic='TopicToConsume', partition=4}, KafkaTopicPartition{topic='TopicToConsume', partition=5}, KafkaTopicPartition{topic='TopicToConsume', partition=2}, KafkaTopicPartition{topic='TopicToConsume', partition=3}, KafkaTopicPartition{topic='TopicToConsume', partition=0}, KafkaTopicPartition{topic='TopicToConsume', partition=1}]
 
>        2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
 
>                
 
>       auto.commit.interval.ms = 5000
 
>                auto.offset.reset = latest
 
>        
 
>        
 
>        
 
>        Any hints?
 
>        
 
>        
 
>        
 
>      
 
>     
 
>     
 
>    
 
>    
 
>