You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Shanthoosh Venkataraman (JIRA)" <ji...@apache.org> on 2018/04/11 20:54:00 UTC
[jira] [Updated] (SAMZA-1647) Fix NPE in JobModelExpired event
handler.
[ https://issues.apache.org/jira/browse/SAMZA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shanthoosh Venkataraman updated SAMZA-1647:
-------------------------------------------
Description:
*Problem:*
There's a minor race condition in container shutdown triggered from the onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor.
*Reason:*
Following is the JobModelExpired event handler logic executed from the DebounceThread:
1. Invoke container shutdown.
2. Wait for configured task.shutdown.ms through countDownLatch.
3. Log the containerId after the configured wait time.
Following is the onContainerFailed event handler logic executed from the ContainerThread:
1. pull down the countdownLatch.
2. Set the container field to null.
If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE.
Relevant stacktrace:
{code:java}
[p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor.
org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent!
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895
at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
... 1 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time
[p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
[debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true
[debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true
[debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop.
java.lang.NullPointerException
at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235)
at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163)
at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449)
at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
was:
*Problem:*
There's a minor race condition in container shutdown triggered from the onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor.
*Reason:*
Following is the JobModelExpired event handler logic executed from the DebounceThread:
1. Invoke container shutdown.
2. Wait for configured task.shutdown.ms through countDownLatch.
3. Log the containerId after the configured wait time.
Following is the onContainerFailed event handler logic executed from the ContainerThread:
1. pull down the countdownLatch.
2. Set the container field to null.
If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE.
Relevant stacktrace:
{code:java}
[p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor.
org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent!
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895 Partition:null
at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
... 1 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time
[p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
[debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true
[debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true
[debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop.
java.lang.NullPointerException
at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235)
at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163)
at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449)
at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
> Fix NPE in JobModelExpired event handler.
> -----------------------------------------
>
> Key: SAMZA-1647
> URL: https://issues.apache.org/jira/browse/SAMZA-1647
> Project: Samza
> Issue Type: Bug
> Reporter: Shanthoosh Venkataraman
> Assignee: Shanthoosh Venkataraman
> Priority: Major
>
> *Problem:*
> There's a minor race condition in container shutdown triggered from the onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor.
> *Reason:*
> Following is the JobModelExpired event handler logic executed from the DebounceThread:
> 1. Invoke container shutdown.
> 2. Wait for configured task.shutdown.ms through countDownLatch.
> 3. Log the containerId after the configured wait time.
> Following is the onContainerFailed event handler logic executed from the ContainerThread:
> 1. pull down the countdownLatch.
> 2. Set the container field to null.
> If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE.
> Relevant stacktrace:
>
> {code:java}
> [p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor.
> org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent!
> at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
> at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
> at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
> at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
> at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
> at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
> at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895
> at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
> at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
> at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
> at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
> at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time
> [p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
> [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true
> [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true
> [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop.
> java.lang.NullPointerException
> at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235)
> at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163)
> at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449)
> at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)