You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2017/02/15 19:55:41 UTC
[jira] [Created] (SAMZA-1088) TestStreamProcessor hangs during job
initialization due to Kafka broker is not available
Yi Pan (Data Infrastructure) created SAMZA-1088:
---------------------------------------------------
Summary: TestStreamProcessor hangs during job initialization due to Kafka broker is not available
Key: SAMZA-1088
URL: https://issues.apache.org/jira/browse/SAMZA-1088
Project: Samza
Issue Type: Bug
Affects Versions: 0.12.0
Reporter: Yi Pan (Data Infrastructure)
On my Mac build, running TestStreamProcessor hangs due to the following issue:
{code}
01:35:13.680 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_OUT
01:35:13.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.678 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()]
01:35:13.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.683 [ZkClient-EventThread-28-127.0.0.1:53690] ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
01:35:13.686 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.686 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
01:35:13.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.690 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
01:35:13.693 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.693 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
Controller 0]: Broker change listener fired for path /brokers/ids with children 0
01:35:13.697 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.696 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.677 [Test worker] VerifiableProperties [INFO] Verifying properties
01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property client.id is overridden to samza_admin-test_job-1
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden to :53693
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,,53693) with correlation id 0 for
1 topic(s) Set(numbers)
01:35:17.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Connected to :53693 for producing
01:35:17.681 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
01:35:17.683 [DEBUG] [TestEventLogger] java.nio.channels.ClosedChannelException
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
01:35:17.683 [DEBUG] [TestEventLogger] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
01:35:17.683 [DEBUG] [TestEventLogger] at org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
01:35:17.683 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.686 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.686 [DEBUG] [TestEventLogger] at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
01:35:17.686 [DEBUG] [TestEventLogger] at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
01:35:17.688 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
01:35:17.688 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.689 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
01:35:17.689 [DEBUG] [TestEventLogger] at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
01:35:17.689 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.690 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.690 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.690 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
01:35:17.690 [DEBUG] [TestEventLogger] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
01:35:17.691 [DEBUG] [TestEventLogger] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
01:35:17.691 [DEBUG] [TestEventLogger] at java.lang.Thread.run(Thread.java:745)
01:35:17.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.681 [Test worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.691 [DEBUG] [TestEventLogger]
01:35:17.691 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_ERROR
01:35:17.691 [DEBUG] [TestEventLogger] 113333 [Test worker] WARN org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets for streams [numbers] due to kafka.common.KafkaException: fetching topic metadata for topics [Set(numbers)] from broker [ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
{code}
Turns out that the initialization of the job failed during the fetchMetadata for the input topics. It keeps re-trying due to failed connection to the broker. At that moment, the broker can not be connected to.
Note that this only happens in Mac and when running the test via gradle:
{noformat}
./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)