You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/02/21 23:35:24 UTC

[GitHub] [samza] PanTheMan opened a new pull request #1287: SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()

PanTheMan opened a new pull request #1287: SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()
URL: https://github.com/apache/samza/pull/1287
 
 
   Symptom: Users when deploying a job in dev will see the following Error message in their logs repeatedly:
   ```
   2019-09-11 14:39:37.193 [Finalizer] [] LifecycleAwareConsumer [ERROR] kafka consumer allocated and not closed (see attached exception for point of allocation)
   
   java.lang.Throwable
   
           at com.linkedin.kafka.linkedinclients.decorators.LifecycleAwareConsumer.<init>(LifecycleAwareConsumer.java:54)
   
           at com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:72)
   
           at com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:44)
   
           at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getKafkaConsumer(SamzaRawLiKafkaSystemFactory.java:135)
   
           at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getAdmin(SamzaRawLiKafkaSystemFactory.java:104)
   
           at org.apache.samza.config.SystemConfig.lambda$getSystemAdmins$0(SystemConfig.java:97)
   
           at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
   
           at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
   
           at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
   
           at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
   
           at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
   
           at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
   
           at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   
           at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
   
           at org.apache.samza.config.SystemConfig.getSystemAdmins(SystemConfig.java:95)
   
           at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
   
           at org.apache.samza.job.JobRunner.run(JobRunner.scala:82)
   
           at org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
   
           at java.util.ArrayList.forEach(ArrayList.java:1257)
   
           at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
   
           at org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:54)
   
           at org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
   
   2019-09-11 14:39:37.193 [kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] [] KafkaProducer [INFO] [Producer clientId=kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
   
   2019-09-11 14:39:37.194 [Finalizer] [] LiKafkaConsumerImpl [INFO] Shutting down in PT1S...
   
   2019-09-11 14:39:37.194 [kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] [] AbstractAuditor [ERROR] Auditor encounter exception.
   
   org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
   
           at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1168)
   
           at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1126)
   
           at com.linkedin.kafka.linkedinclients.auditing.TmeAuditor.onClosed(TmeAuditor.java:366)
   
           at com.linkedin.kafka.clients.auditing.abstractimpl.AbstractAuditor.run(AbstractAuditor.java:157)
   
   Caused by: java.lang.InterruptedException
   
           at java.lang.Object.wait(Native Method)
   
           at java.lang.Thread.join(Thread.java:1252)
   
           at java.lang.Thread.join(Thread.java:1326)
   
           at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1166)
   
           ... 3 more
   
   ```
   This error message will appear multiple times in their job and doesn't actually affect the job's performance or correctness. This leads to users falsely believing that any job failures is because of this message.
   
   Cause: 
   All Kafka consumers will have a finalize method in them. This means that when the garbage collector determines that there's no more reference to the consumer, it will try to close the consumer. So for ProcessJob, in the ProcessJobFactory, a CoordinatorStreamStore is initialized. It is then wrapped around a NameSpaceAwareCoordinatorStreamStore. Later on when the ProcessJob is done, a `close()` is called on the NameSpaceAwareCoordinatorStreamStore, however this doesn't close the original CoordinatorStreamStore. So, there are no more references to the CoordinatorStreamStore, leading to the finalize method reporting the above error.
   
   Changes: ProcessJob will now have an additional parameter which will be the CoordinatorStreamStore. Then a close is called on the store when the job is done running. That way there is still a reference to the store after the ProcessJob starts. Also updated the TestProcessJob file to test that the store is closed properly. 
   
   Tests: To reproduce this error, deploy any job running the latest version of Samza and look in the .out log file.
   --
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] PanTheMan closed pull request #1287: SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()

Posted by GitBox <gi...@apache.org>.
PanTheMan closed pull request #1287: SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close()
URL: https://github.com/apache/samza/pull/1287
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services