You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Zihao Zhang (JIRA)" <ji...@apache.org> on 2018/04/30 22:35:00 UTC

[jira] [Commented] (SAMZA-1371) Some Samza Containers get stuck at "Starting BrokerProxy for hostname:portnum" while others seem to be fine

    [ https://issues.apache.org/jira/browse/SAMZA-1371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459203#comment-16459203 ] 

Zihao Zhang commented on SAMZA-1371:
------------------------------------

Hi [~nickpan47]

I'm a colleague with Hao Song and I'm working on this issue now. According to our investigation, the reason why some containers get stuck is becuase, in BrokerProxy thread, SimpleConsumer class (package kafka.consumer) used in DefaultFetchSimpleConsumer(package org.apache.samza.system.kafka) could not receive any message for a given partition a specific offset. The stuck offset is between the first valid offset and the latest valid offset for this given partition. So the consumer will keep sending request and trying to receive message in an infinite loop. And The stuck offset would not change for a given partition by multiple restarts.

My remote debugging indicated that the inputStream(invoked by NetworkReceive#readFromReadableChannel()) to feed this consumer only contains some meta data with the topic/partition/offset info instead of a valid topic message for this given offset.

I've verifyed that the message in the specific offset is still valid. To do this, I wrote a small tool to fetch message directly from the brokers. When i tried to fetch message for the stuck offset, it led to the same result - timeout before receiving any message. Then I bumped up the version of kafka.clients to 0.11.0.2 and tried again. This time, the consumer will skipped the stuck offset and return the following x messages as asked.

Due to some build issue, i've not get a chance to try the newest version by pulling from master. But according to my investigation, there's a good chance that the higher version of kafka client would help to solved this stuck containers issue. I'm wondering: 
1. Do you have any other suggestion / ideas on this?
2. Do you have a timeline to realse the new samza version with a kafka-clients dependency of 0.11 ?

Thank you.

> Some Samza Containers get stuck at "Starting BrokerProxy for hostname:portnum" while others seem to be fine
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1371
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1371
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.11.0, 0.12.0
>         Environment: Samza version: 0.11, 0.12
> Kafka version: 0.11.0.0
>            Reporter: Ak Ka
>            Priority: Blocker
>         Attachments: stdout.log, thread_dump.txt
>
>
> We have multiple Samza apps using local store that have this issue. Some containers get stuck on "Starting BrokerProxy for hostname:portnum" while others seem to work as expected.  
> Here is the log:
> stuck:
> ```
> [...]
> 2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO] Validating offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
> 2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]. Using it to instantiate consumer.
> 2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum
> // it's dead, Jim
> ```
> healthy:
> ```
> [...]
> 2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO] Validating offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]. Using it to instantiate consumer.
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
> 2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedChannelException
> 2017-07-25 17:11:29.244 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Abdicating for [prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1]
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Refreshing brokers for: Map([prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1] -> 13572)
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
> 2017-07-25 17:11:29.247 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.248 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
> 2017-07-25 17:11:29.265 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
> 2017-07-25 17:11:29.523 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
> 2017-07-25 17:11:29.601 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
> 2017-07-25 17:11:29.663 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
> 2017-07-25 17:11:29.668 [main] org.apache.samza.container.SamzaContainer [INFO] Starting host statistics monitor
> 2017-07-25 17:11:29.670 [main] org.apache.samza.container.SamzaContainer [INFO] Registering task instances with producers.
> 2017-07-25 17:11:29.674 [main] org.apache.samza.container.SamzaContainer [INFO] Starting producer multiplexer.
> 2017-07-25 17:11:29.675 [main] org.apache.samza.container.SamzaContainer [INFO] Initializing stream tasks.
> 2017-07-25 17:11:29.676 [main] com.company.samza.app.companyStreamingAppWrapper [INFO] Initializing instance of streaming application
> 2017-07-25 17:11:29.681 [main] com.company.samza.app.companyStreamingAppWrapper [INFO] First initialization. Setting up Guice container with configuration companyStreamingAppWrapperConfiguration{company.app.name=AlertsOrganizerInstant, company.appgroup=aws, company.env=prod, company.guice.module=com.company.notifications.Alerts.organizer..AlertsOrganizerModule}
> 2017-07-25 17:11:30.118 [main] com.company.config.guice.configModule [INFO] configModule loaded requested override file '/storage/data/secure/config/AnalyticsServiceClient.cfg'
> 2017-07-25 17:11:30.480 [main] com.company.samza.dataService.SamzaSessionFactoriesModule [INFO] Loading prod dbConfig from /data/config/prod.database.properties
> // Hibernate stuff (i.e. our code is hit)
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)