You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/11/19 19:26:50 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1213: SAMZA-2305: Stream processor should ensure previous container is stopped during a rebalance

mynameborat commented on a change in pull request #1213: SAMZA-2305: Stream processor should ensure previous container is stopped during a rebalance
URL: https://github.com/apache/samza/pull/1213#discussion_r348122607
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
 ##########
 @@ -769,6 +768,27 @@ class SamzaContainer(
       else
         Thread.sleep(Long.MaxValue)
     } catch {
+      case e: InterruptedException =>
+        /*
+         * We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within
+         * our code is inside stream processor is during the following two scenarios
+         *    1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor.
+         *       Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier
+         *       to agree on the new work assignment.
+         *    2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to
+         *       shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in
+         *       turn interrupts the container thread.
+         *
+         * In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally
+         * (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown
+         * the container since it is not capable of reacting to shutdown signals in all scenarios.
+         *
+         */
+        if (status.equals(SamzaContainerStatus.STARTED)) {
+          warn("Received an interrupt in run loop.", e)
+        } else {
+          warn("Received an interrupt during initialization.", e)
 
 Review comment:
   Instead of using status, I chose to explicitly call out the status definitions in the log to reduce the indirection. Let me know if you still think specifying the status adds value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services