You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2017/02/15 19:55:41 UTC

[jira] [Created] (SAMZA-1088) TestStreamProcessor hangs during job initialization due to Kafka broker is not available

Yi Pan (Data Infrastructure) created SAMZA-1088:
---------------------------------------------------

             Summary: TestStreamProcessor hangs during job initialization due to Kafka broker is not available
                 Key: SAMZA-1088
                 URL: https://issues.apache.org/jira/browse/SAMZA-1088
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.12.0
            Reporter: Yi Pan (Data Infrastructure)


On my Mac build, running TestStreamProcessor hangs due to the following issue:
{code}
01:35:13.680 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_OUT
01:35:13.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.678 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
 Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()]
01:35:13.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.683 [ZkClient-EventThread-28-127.0.0.1:53690] ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
01:35:13.686 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.686 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
01:35:13.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.690 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
01:35:13.693 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.693 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
 Controller 0]: Broker change listener fired for path /brokers/ids with children 0
01:35:13.697 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.696 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
 Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.677 [Test worker] VerifiableProperties [INFO] Verifying properties
01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property client.id is overridden to samza_admin-test_job-1
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden to :53693
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,,53693) with correlation id 0 for
 1 topic(s) Set(numbers)
01:35:17.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Connected to :53693 for producing
01:35:17.681 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
01:35:17.683 [DEBUG] [TestEventLogger]     java.nio.channels.ClosedChannelException
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.686 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.689 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
01:35:17.689 [DEBUG] [TestEventLogger]          at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
01:35:17.689 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.690 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
01:35:17.690 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
01:35:17.691 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
01:35:17.691 [DEBUG] [TestEventLogger]          at java.lang.Thread.run(Thread.java:745)
01:35:17.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.681 [Test worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.691 [DEBUG] [TestEventLogger] 
01:35:17.691 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_ERROR
01:35:17.691 [DEBUG] [TestEventLogger]     113333 [Test worker] WARN org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets for streams [numbers] due to kafka.common.KafkaException: fetching topic metadata for topics [Set(numbers)] from broker [ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
{code}

Turns out that the initialization of the job failed during the fetchMetadata for the input topics. It keeps re-trying due to failed connection to the broker. At that moment, the broker can not be connected to.

Note that this only happens in Mac and when running the test via gradle:
{noformat}
./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
{noformat}





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)