You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2015/10/01 22:58:27 UTC

[jira] [Updated] (SAMZA-753) BrokerProxy stop should stop the Kafka consumer first

     [ https://issues.apache.org/jira/browse/SAMZA-753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yi Pan (Data Infrastructure) updated SAMZA-753:
-----------------------------------------------
    Fix Version/s: 0.10.0

> BrokerProxy stop should stop the Kafka consumer first
> -----------------------------------------------------
>
>                 Key: SAMZA-753
>                 URL: https://issues.apache.org/jira/browse/SAMZA-753
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Yan Fang
>            Assignee: Yan Fang
>             Fix For: 0.10.0
>
>         Attachments: SAMZA-753.2.patch, SAMZA-753.patch
>
>
> Currently, in the BrokerProxy stop method, we only interrupt the BrokerProxy thread, and do not take care of the simple consumer. This causes the following exception when we stop the KafkaSystemConsumer:
> {code}
> 2015-08-04 14:00:34 CoordinatorStreamSystemConsumer [INFO] Stopping coordinator stream system consumer.
> 2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down BrokerProxy for 10.151.111.27:9092
> 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
> 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
> 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
> 2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
> 2015-08-04 14:00:34 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
> 2015-08-04 14:00:34 BrokerProxy [DEBUG] Exception detail:
> java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
> 	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
> 	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
> 	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132)
> 	at java.lang.Thread.run(Thread.java:745)
> 2015-08-04 14:00:34 BrokerProxy [DEBUG] Removed [__samza_coordinator_simple-task0715_1,0]
> 2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Abdicating for [__samza_coordinator_simple-task0715_1,0]
> 2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([__samza_coordinator_simple-task0715_1,0] -> 931)
> 2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down due to interrupt.
> {code} 
> We should stop the simple consumer first, then interrupt the BrokerProxy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)