You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/01/16 18:26:11 UTC

samza git commit: SAMZA-2073: Do not commit the task offsets when shutting down the SamzaContainer.

Repository: samza
Updated Branches:
  refs/heads/master 0c1e02518 -> 5ff7f239f


SAMZA-2073: Do not commit the task offsets when shutting down the SamzaContainer.

SAMZA-1489 added support for committing the offsets of all the  tasks when shutting down a SamzaContainer. Other components in samza such as a CheckpointListener's  aren't developed to account for possibility of commit after the consumers are stopped.

This  unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:
```
apache.samza.container.SamzaContainer129d533c to shutdown.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
java.lang.IllegalStateException: This consumer has already been closed.
 at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
 at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)
```

Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence.

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #884 from shanthoosh/remove_task_commit_during_shutdown


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5ff7f239
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5ff7f239
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5ff7f239

Branch: refs/heads/master
Commit: 5ff7f239f1b54af2862496fa9f7839d7c2656f56
Parents: 0c1e025
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Wed Jan 16 10:26:07 2019 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Jan 16 10:26:07 2019 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/container/SamzaContainer.scala | 6 ------
 1 file changed, 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5ff7f239/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ec7360a..094f49a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -785,7 +785,6 @@ class SamzaContainer(
   val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
-  val isAutoCommitEnabled = config.isAutoCommitEnabled
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
@@ -1148,11 +1147,6 @@ class SamzaContainer(
       }
     }
 
-    if (isAutoCommitEnabled) {
-      info("Committing offsets for all task instances")
-      taskInstances.values.foreach(_.commit)
-    }
-
     taskInstances.values.foreach(_.shutdownTask)
   }