You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Shanthoosh Venkataraman (JIRA)" <ji...@apache.org> on 2019/01/16 02:35:00 UTC

[jira] [Updated] (SAMZA-2073) Do not commit the task offsets when shutting down the SamzaContainer.

     [ https://issues.apache.org/jira/browse/SAMZA-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shanthoosh Venkataraman updated SAMZA-2073:
-------------------------------------------
    Description: 
SAMZA-1489 added support for committing the offsets of all the running tasks when shutting down a SamzaContainer. Other components in samza such as a CheckpointListener's  aren't developed to account for possibility of commit after the consumers are stopped.

This  unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:
{noformat}
apache.samza.container.SamzaContainer@129d533c to shutdown.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
java.lang.IllegalStateException: This consumer has already been closed.
 at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
 at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)

{noformat}
 

 

Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence. 

  was:
SAMZA-1489 added support for committing the offsets of all the running tasks in a SamzaContainer. Other components in samza such as a CheckpointListener's  aren't developed to account for possibility of commit after the consumers are stopped.

This  unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:

{noformat}

apache.samza.container.SamzaContainer@129d533c to shutdown.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
java.lang.IllegalStateException: This consumer has already been closed.
 at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
 at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)

{noformat} 

 

Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence. 


> Do not commit the task offsets when shutting down the SamzaContainer.
> ---------------------------------------------------------------------
>
>                 Key: SAMZA-2073
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2073
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> SAMZA-1489 added support for committing the offsets of all the running tasks when shutting down a SamzaContainer. Other components in samza such as a CheckpointListener's  aren't developed to account for possibility of commit after the consumers are stopped.
> This  unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:
> {noformat}
> apache.samza.container.SamzaContainer@129d533c to shutdown.
> 2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
> 2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
> 2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
> 2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
> 2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
> 2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
> 2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
> java.lang.IllegalStateException: This consumer has already been closed.
>  at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
>  at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
>  at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
>  at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)
> {noformat}
>  
>  
> Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)