You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Xinyu Liu (JIRA)" <ji...@apache.org> on 2018/11/01 17:33:00 UTC

[jira] [Commented] (SAMZA-1638) Recreate SystemProducer on KafkaCheckpointManager.writeCheckpoint failure.

    [ https://issues.apache.org/jira/browse/SAMZA-1638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671913#comment-16671913 ] 

Xinyu Liu commented on SAMZA-1638:
----------------------------------

This happened in one of the LinkedIn jobs. The log:

2018-10-24 20:25:22.370 [main] AsyncRunLoop [ERROR] Task Partition 886 commit failed
org.apache.samza.SamzaException: Exception when writing checkpoint: Checkpoint [offsets={}] for task: Partition 886.
	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$writeCheckpoint$2.apply(KafkaCheckpointManager.scala:173)
	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$writeCheckpoint$2.apply(KafkaCheckpointManager.scala:170)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:89)
	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.writeCheckpoint(KafkaCheckpointManager.scala:162)
	at org.apache.samza.checkpoint.OffsetManager.writeCheckpoint(OffsetManager.scala:260)
	at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:223)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker$5.run(AsyncRunLoop.java:531)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.commit(AsyncRunLoop.java:550)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:416)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:341)
	at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:232)
	at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:166)
	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:787)
	at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
	at org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:113)
	at org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:48)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
	at com.linkedin.comlin.ComlinFeedInteractionsJoin.run(ComlinFeedInteractionsJoin.java:82)
	at com.linkedin.comlin.ComlinFeedInteractionsJoin.main(ComlinFeedInteractionsJoin.java:52)
Caused by: org.apache.samza.system.SystemProducerException: Producer was unable to recover from previous exception.
	at com.linkedin.samza.system.kafka.SamzaLiKafkaSystemProducer.send(SamzaLiKafkaSystemProducer.java:133)
	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$writeCheckpoint$1.apply(KafkaCheckpointManager.scala:164)
	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$writeCheckpoint$1.apply(KafkaCheckpointManager.scala:163)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
	... 17 more
Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: Partition 367 on System:samzametadatasystem Topic:__samza_checkpoint_ver_1_for_comlin-feed-interactions-beam_i001 PartitionKey:0 RecordMetadataInfo:N/A
	at com.linkedin.samza.system.kafka.SamzaLiKafkaSystemProducer.lambda$send$28(SamzaLiKafkaSystemProducer.java:168)
	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:371)
	at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
	at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
	at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:610)
	at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:282)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:168)
	at java.lang.Thread.run(Thread.java:745)

> Recreate SystemProducer on KafkaCheckpointManager.writeCheckpoint failure.
> --------------------------------------------------------------------------
>
>                 Key: SAMZA-1638
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1638
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Priority: Major
>
> Retry loop in the KafkaCheckpointManager implementation currently reuses the same SystemProducer instance on exception and does not recreate it.
> When some irrecoverable exceptions are thrown by the producer, all subsequent operations on that instance will fail there by making the retry loop pointless.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)