You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jake Maes (JIRA)" <ji...@apache.org> on 2018/01/12 19:27:05 UTC

[jira] [Updated] (SAMZA-1088) TestStreamProcessor hangs during job initialization due to Kafka broker is not available

     [ https://issues.apache.org/jira/browse/SAMZA-1088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jake Maes updated SAMZA-1088:
-----------------------------
    Fix Version/s:     (was: 0.14.0)
                   0.15.0

> TestStreamProcessor hangs during job initialization due to Kafka broker is not available
> ----------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1088
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1088
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Yi Pan (Data Infrastructure)
>             Fix For: 0.15.0
>
>
> On my Mac build, running TestStreamProcessor hangs due to the following issue:
> {code}
> 01:35:13.680 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_OUT
> 01:35:13.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.678 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
>  Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()]
> 01:35:13.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.683 [ZkClient-EventThread-28-127.0.0.1:53690] ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
> 01:35:13.686 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.686 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
> nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
> 01:35:13.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.690 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
> nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
> 01:35:13.693 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.693 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
>  Controller 0]: Broker change listener fired for path /brokers/ids with children 0
> 01:35:13.697 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.696 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
>  Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
> 01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.677 [Test worker] VerifiableProperties [INFO] Verifying properties
> 01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property client.id is overridden to samza_admin-test_job-1
> 01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden to :53693
> 01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000
> 01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,,53693) with correlation id 0 for
>  1 topic(s) Set(numbers)
> 01:35:17.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Connected to :53693 for producing
> 01:35:17.681 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Disconnecting from :53693
> 01:35:17.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
> 01:35:17.683 [DEBUG] [TestEventLogger]     java.nio.channels.ClosedChannelException
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
> 01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
> 01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
> 01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
> 01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
> 01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
> 01:35:17.689 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> 01:35:17.690 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 01:35:17.691 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 01:35:17.691 [DEBUG] [TestEventLogger]          at java.lang.Thread.run(Thread.java:745)
> 01:35:17.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.681 [Test worker] SyncProducer [INFO] Disconnecting from :53693
> 01:35:17.691 [DEBUG] [TestEventLogger] 
> 01:35:17.691 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_ERROR
> 01:35:17.691 [DEBUG] [TestEventLogger]     113333 [Test worker] WARN org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets for streams [numbers] due to kafka.common.KafkaException: fetching topic metadata for topics [Set(numbers)] from broker [ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
> {code}
> Turns out that the initialization of the job failed during the fetchMetadata for the input topics. It keeps re-trying due to failed connection to the broker. At that moment, the broker can not be connected to.
> Note that this only happens in Mac and when running the test via gradle:
> {noformat}
> ./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)