You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2015/08/04 03:31:05 UTC

[jira] [Commented] (SAMZA-723) hello-samza hangs when we use StreamAppender

    [ https://issues.apache.org/jira/browse/SAMZA-723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14652918#comment-14652918 ] 

Yan Fang commented on SAMZA-723:
--------------------------------

hmm, I looked at the stacktrace, got some different result:

{code}
"main" prio=5 tid=0x00007fcc2a001000 nid=0x1903 waiting on condition [0x0000000105b5e000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007f1bfe270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	at org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:135)
	at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:80)
	at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:57)
	at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.bootstrap(CoordinatorStreamSystemConsumer.java:141)
	at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:68)
	at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:77)
	at org.apache.samza.coordinator.JobCoordinator.apply(JobCoordinator.scala)
	at org.apache.samza.logging.log4j.StreamAppender.getConfig(StreamAppender.java:183)
	at org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:94)
	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
	at org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
	at org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176)
	at org.apache.log4j.xml.DOMConfigurator.findAppenderByReference(DOMConfigurator.java:191)
	at org.apache.log4j.xml.DOMConfigurator.parseChildrenOfLoggerElement(DOMConfigurator.java:523)
	at org.apache.log4j.xml.DOMConfigurator.parseRoot(DOMConfigurator.java:492)
	- locked <0x00000007f4090ec8> (a org.apache.log4j.spi.RootLogger)
	at org.apache.log4j.xml.DOMConfigurator.parse(DOMConfigurator.java:1001)
	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:867)
	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:773)
	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483)
	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
	at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
	- locked <0x00000007f40910e8> (a org.slf4j.impl.Log4jLoggerFactory)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
	at org.apache.samza.util.Logging$class.logger(Logging.scala:27)
	at org.apache.samza.job.yarn.SamzaAppMaster$.logger$lzycompute(SamzaAppMaster.scala:56)
	- locked <0x00000007f4091110> (a org.apache.samza.job.yarn.SamzaAppMaster$)
	at org.apache.samza.job.yarn.SamzaAppMaster$.logger(SamzaAppMaster.scala:56)
	at org.apache.samza.util.Logging$class.info(Logging.scala:54)
	at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:56)
	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:64)
	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)

"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 10.151.99.45:9092 for client samza_consumer-simple_task0715-1-1438649236052-1" daemon prio=5 tid=0x00007fcc298ad800 nid=0x6003 in Object.wait() [0x000000010e96a000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.log4j.LogManager.getLoggerRepository(LogManager.java:196)
	at org.apache.log4j.LogManager.getLogger(LogManager.java:228)
	at org.apache.log4j.Logger.getLogger(Logger.java:104)
	at kafka.utils.Logging$class.logger(Logging.scala:24)
	at kafka.network.BoundedByteBufferSend.logger$lzycompute(BoundedByteBufferSend.scala:26)
	- locked <0x00000007f202b150> (a kafka.network.BoundedByteBufferSend)
	at kafka.network.BoundedByteBufferSend.logger(BoundedByteBufferSend.scala:26)
	at kafka.utils.Logging$class.trace(Logging.scala:35)
	at kafka.network.BoundedByteBufferSend.trace(BoundedByteBufferSend.scala:26)
	at kafka.network.Send$class.writeCompletely(Transmission.scala:76)
	at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:70)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
	- locked <0x00000007f1d27240> (a java.lang.Object)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132)
	at java.lang.Thread.run(Thread.java:745)

{code}

There is not specific lock here... Even can not find the kafka-producer-network-thread thread...

> hello-samza hangs when we use StreamAppender
> --------------------------------------------
>
>                 Key: SAMZA-723
>                 URL: https://issues.apache.org/jira/browse/SAMZA-723
>             Project: Samza
>          Issue Type: Bug
>          Components: hello-samza
>    Affects Versions: 0.10.0
>            Reporter: Navina Ramesh
>            Assignee: Yan Fang
>             Fix For: 0.10.0
>
>
> I added StreamAppender to log4j.xml , task.log4j.system=kafka in the config and added a simple log line in process call to see if it creates the logging event correctly. 
> When I deploy the job (WikipediaFeedStreamTask.java), the AppMaster seems to just hang in the bootstrap method. I don’t get any exceptions in the AppMaster logs. I checked the kafka logs and found some socket closed errors in the stack trace. 
> ==> deploy/kafka/logs/kafka.log <==
> [2015-06-25 16:48:01,179] ERROR Closing socket for /172.21.132.57 because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
> at kafka.network.Processor.write(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:342)
> at java.lang.Thread.run(Thread.java:745)
> ==> deploy/kafka/logs/server.log <==
> [2015-06-25 16:48:01,179] ERROR Closing socket for /172.21.132.57 because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
> at kafka.network.Processor.write(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:342)
> at java.lang.Thread.run(Thread.java:745)
> Need to investigate this issue further. looks like the CoordinatorStream is conflicting with the StreamAppender components.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)