You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Deshpande, Omkar" <Om...@intuit.com.INVALID> on 2020/03/21 22:17:18 UTC

Execution of action: JobModelVersionChange failed.

We are using beam with samza runner - beam.version 2.19.0, samza.version 1.3.0

And we are seeing the following excption frequently. Should we be tweaking some configuration? Does this point to any network connectivity issue?

2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade - Subscribing data changes on the path: /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state for barrier version: 151.
2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - Execution of action: JobModelVersionChange failed.
java.lang.IllegalStateException: ZkClient already closed!
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
        at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
        at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
        at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
        at org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception in debounce timer! Stopping the job coordinator
java.lang.IllegalStateException: ZkClient already closed!
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
        at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
        at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
        at org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
        at org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator - Job Coordinator shutdown is in progress!
2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught exception/error in run loop.
org.apache.samza.SamzaException: Run loop is interrupted
        at org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method)
        at org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
        ... 7 common frames omitted
2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer - Shutting down SamzaContainer.
2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult - Container shutdown timed out after 10000 ms.
java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
        at org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
        at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An illegal error occurred, forcibly terminating application
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
        at org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
        at org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
        at com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30)
        at com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74)
        at com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103)
Caused by: java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
        at org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
        at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer - Shutting down consumer multiplexer.
2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer - Shutting down task instance stream tasks.
2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer - Shutting down task thread pool
2020/03/21 21:42:09.902 INFO  c.i.s.sdk.core.SppBaseProcessor - Application finished execution; terminating Cluster.
2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult - Container shutdown timed out after 10000 ms.
java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
        at org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
        at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020/03/21 21:42:09.903 INFO  c.i.s.sdk.core.SppBaseProcessor - Application Killer sleeping for 30000 ms
Exception in thread "Thread-8" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
        at org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
        at org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
        at org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62)
        at com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Container shutdown timed out after 10000 ms.
        at org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
        at org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
        at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
        at org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
        at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
2020/03/21 21:42:09.907 INFO  o.a.samza.container.SamzaContainer - Shutting down timer executor
2020/03/21 21:42:09.908 INFO  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Re: Execution of action: JobModelVersionChange failed.

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
Hi Omkar,

So what does stopping a samza container mean in context of a Beam Pipeline?


A container shutting down during a rebalance shouldn't affect the beam
application unless the shutdown was unsuccessful.


> Does pres.waitUntilFinish() return when samza container is being stopped ?
>

waitUntilFinish() is a blocking call and will only return when the
application exits (e.g. non-streaming jobs) or in the event of application
errors (errors within samza container etc)
In a standalone setup, samza application can either be run as an individual
application or along side with other services sharing the same JVM. For
this reason, any runtime errors in samza container doesn't bring down the
JVM by default. However, it might be desirable for some applications to
bring down their application and that can be achieved by using a monitor
thread that waits on the *pipeline.waitUntilFinish* and then triggers
*System.exit()* or *Runtime.halt() *or* necessary steps.*

And does rebalancing always include stopping all the containers?
>
Yes, rebalances are handled by coordinators which brings down the existing
container and spins up a new container.

I think Thread.sleep(30000)  in this code is increasing the time taken for
> the shutdown sequence. Is that correct?
>
I don't think that is causing delays in the shutdown. The block of code
only gets triggered after *waitUntilFinish* returns which is due to
exception in container shutdown sequence. You may want to check if there
any pluggable components (like Consumers, TableProviders) etc take longer
time to shutdown.

As far as your code snippet, I think the reason shutdown signals don't
bring down your application is because of deadlock
<https://stackoverflow.com/questions/25204615/deadlock-in-java-shutdown-hook>
between your main thread which does a *System.exit()* and the shutdown hook
which waits on the *mainThread.join()*.


Thanks,
Bharath

On Mon, Mar 23, 2020 at 3:28 PM Deshpande, Omkar
<Om...@intuit.com.invalid> wrote:

> Hey Bharath,
>
> I probably know what is taking long in my shutdown sequence.
>
> My code roughly looks like this -
> https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a
> I think Thread.sleep(30000)  in this code is increasing the time taken for
> the shutdown sequence. Is that correct?
>
> So what does stopping a samza container mean in context of a Beam
> Pipeline? Does pres.waitUntilFinish() return when samza container is being
> stopped ?
>
> We added Runtime.getRuntime().halt() because our JVM running a Beam
> pipeline was hanging up without exiting, in a lot of cases.
> Do you have suggestions on better way to handle this?
>
> And does rebalancing always include stopping all the containers?
> We are running on K8S and the pods are often moved around. And every time
> a pod is moved, rebalance will be triggered.
> And the rebalance in turn will restart all other pods.
>
>
> On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian" <
> codin.martial@gmail.com> wrote:
>
>     This email is from an external sender.
>
>
>     Hi Omkar,
>
>     The errors are related to timeouts during shutdown which gets triggered
>     during a rebalance. Whenever a new processor joins the quorum or
> leaves the
>     quorum, a rebalance is triggered which requires all the existing
> processors
>     to shutdown its container before agreeing on the new job model.
>     In your case, it looks like the container is taking beyond the
> configured
>     timeout (task.shutdown.ms) and hence throwing an exception.
>
>     Do you know what is taking so long in your shutdown sequence?
>
>     Meanwhile, you can start by increasing the shutdown timeout to a higher
>     value.
>     *Note:* You will need to account for the increase in the *consensus
> timeout*
>     - the time the leader of the quorum will wait for other participants to
>     agree on the new job model. If the other processors are still in the
>     shutdown phase, the leader may end up expiring the current barrier and
>     trigger another rebalance.
>
>     For e.g. if the current setup is
>     *task.shutdown.ms <http://task.shutdown.ms> = 10000*
>     *job.coordinator.zk.consensus.timeout.ms
>     <http://job.coordinator.zk.consensus.timeout.ms> = 30000*
>     then your new setup will roughly (depending on how much room you
> already
>     have between these two configurations) need to be following where
> "*x*" -
>     denotes the increase in the value
>     *task.shutdown.ms <http://task.shutdown.ms> = 10000 + x*
>     *job.coordinator.zk.consensus.timeout.ms
>     <http://job.coordinator.zk.consensus.timeout.ms> = 30000 + x*
>     Let me know how it goes.
>
>     Thanks,
>     Bharath
>
>     On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
>     <Om...@intuit.com.invalid> wrote:
>
>     > We are using beam with samza runner - beam.version 2.19.0,
> samza.version
>     > 1.3.0
>     >
>     > And we are seeing the following excption frequently. Should we be
> tweaking
>     > some configuration? Does this point to any network connectivity
> issue?
>     >
>     > 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
>     > Subscribing data changes on the path:
>     >
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
>     > for barrier version: 151.
>     > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
>     > Execution of action: JobModelVersionChange failed.
>     > java.lang.IllegalStateException: ZkClient already closed!
>     >         at
>     > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>     >         at
> org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>     >         at
>     > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>     >         at
>     > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>     >         at
>     >
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
>     > Received exception in debounce timer! Stopping the job coordinator
>     > java.lang.IllegalStateException: ZkClient already closed!
>     >         at
>     > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>     >         at
> org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>     >         at
>     > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>     >         at
>     > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>     >         at
>     >
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > 2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator -
> Job
>     > Coordinator shutdown is in progress!
>     > 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer -
> Caught
>     > exception/error in run loop.
>     > org.apache.samza.SamzaException: Run loop is interrupted
>     >         at
>     >
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
>     >         at
> org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
>     >         at
>     >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > Caused by: java.lang.InterruptedException: null
>     >         at java.lang.Object.wait(Native Method)
>     >         at
>     >
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
>     >         ... 7 common frames omitted
>     > 2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer -
>     > Shutting down SamzaContainer.
>     > 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult -
>     > Container shutdown timed out after 10000 ms.
>     > java.util.concurrent.TimeoutException: Container shutdown timed out
> after
>     > 10000 ms.
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > 2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An
> illegal
>     > error occurred, forcibly terminating application
>     > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>     > java.util.concurrent.TimeoutException: Container shutdown timed out
> after
>     > 10000 ms.
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>     >         at
>     >
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30)
>     >         at
>     >
> com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74)
>     >         at
>     >
> com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103)
>     > Caused by: java.util.concurrent.TimeoutException: Container shutdown
> timed
>     > out after 10000 ms.
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >        at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
>     > Shutting down consumer multiplexer.
>     > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
>     > Shutting down task instance stream tasks.
>     > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
>     > Shutting down task thread pool
>     > 2020/03/21 21:42:09.902 INFO  c.i.s.sdk.core.SppBaseProcessor -
>     > Application finished execution; terminating Cluster.
>     > 2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult -
>     > Container shutdown timed out after 10000 ms.
>     > java.util.concurrent.TimeoutException: Container shutdown timed out
> after
>     > 10000 ms.
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > 2020/03/21 21:42:09.903 INFO  c.i.s.sdk.core.SppBaseProcessor -
>     > Application Killer sleeping for 30000 ms
>     > Exception in thread "Thread-8"
>     > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>     > java.util.concurrent.TimeoutException: Container shutdown timed out
> after
>     > 10000 ms.
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>     >         at
>     >
> org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62)
>     >         at
>     >
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23)
>     >         at java.lang.Thread.run(Thread.java:748)
>     > Caused by: java.util.concurrent.TimeoutException: Container shutdown
> timed
>     > out after 10000 ms.
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>     >         at
>     >
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>     >         at
>     >
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>     >         at
>     >
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>     >         at
>     >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     >         at
>     >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >         at
>     >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >         ... 1 more
>     > 2020/03/21 21:42:09.907 INFO  o.a.samza.container.SamzaContainer -
>     > Shutting down timer executor
>     > 2020/03/21 21:42:09.908 INFO  o.a.k.clients.producer.KafkaProducer -
>     > Closing the Kafka producer with timeoutMillis = 9223372036854775807
> ms.
>     >
>
>
>

Re: Execution of action: JobModelVersionChange failed.

Posted by "Deshpande, Omkar" <Om...@intuit.com.INVALID>.
Hey Bharath,

I probably know what is taking long in my shutdown sequence.

My code roughly looks like this -  https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a
I think Thread.sleep(30000)  in this code is increasing the time taken for the shutdown sequence. Is that correct?

So what does stopping a samza container mean in context of a Beam Pipeline? Does pres.waitUntilFinish() return when samza container is being stopped ?

We added Runtime.getRuntime().halt() because our JVM running a Beam pipeline was hanging up without exiting, in a lot of cases.  
Do you have suggestions on better way to handle this?

And does rebalancing always include stopping all the containers? 
We are running on K8S and the pods are often moved around. And every time a pod is moved, rebalance will be triggered.
And the rebalance in turn will restart all other pods.


On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian" <co...@gmail.com> wrote:

    This email is from an external sender.
    
    
    Hi Omkar,
    
    The errors are related to timeouts during shutdown which gets triggered
    during a rebalance. Whenever a new processor joins the quorum or leaves the
    quorum, a rebalance is triggered which requires all the existing processors
    to shutdown its container before agreeing on the new job model.
    In your case, it looks like the container is taking beyond the configured
    timeout (task.shutdown.ms) and hence throwing an exception.
    
    Do you know what is taking so long in your shutdown sequence?
    
    Meanwhile, you can start by increasing the shutdown timeout to a higher
    value.
    *Note:* You will need to account for the increase in the *consensus timeout*
    - the time the leader of the quorum will wait for other participants to
    agree on the new job model. If the other processors are still in the
    shutdown phase, the leader may end up expiring the current barrier and
    trigger another rebalance.
    
    For e.g. if the current setup is
    *task.shutdown.ms <http://task.shutdown.ms> = 10000*
    *job.coordinator.zk.consensus.timeout.ms
    <http://job.coordinator.zk.consensus.timeout.ms> = 30000*
    then your new setup will roughly (depending on how much room you already
    have between these two configurations) need to be following where "*x*" -
    denotes the increase in the value
    *task.shutdown.ms <http://task.shutdown.ms> = 10000 + x*
    *job.coordinator.zk.consensus.timeout.ms
    <http://job.coordinator.zk.consensus.timeout.ms> = 30000 + x*
    Let me know how it goes.
    
    Thanks,
    Bharath
    
    On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
    <Om...@intuit.com.invalid> wrote:
    
    > We are using beam with samza runner - beam.version 2.19.0, samza.version
    > 1.3.0
    >
    > And we are seeing the following excption frequently. Should we be tweaking
    > some configuration? Does this point to any network connectivity issue?
    >
    > 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
    > Subscribing data changes on the path:
    > /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
    > for barrier version: 151.
    > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
    > Execution of action: JobModelVersionChange failed.
    > java.lang.IllegalStateException: ZkClient already closed!
    >         at
    > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
    >         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
    >         at
    > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
    >         at
    > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
    >         at
    > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
    > Received exception in debounce timer! Stopping the job coordinator
    > java.lang.IllegalStateException: ZkClient already closed!
    >         at
    > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
    >         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
    >         at
    > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
    >         at
    > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
    >         at
    > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > 2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator - Job
    > Coordinator shutdown is in progress!
    > 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught
    > exception/error in run loop.
    > org.apache.samza.SamzaException: Run loop is interrupted
    >         at
    > org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
    >         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
    >         at
    > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > Caused by: java.lang.InterruptedException: null
    >         at java.lang.Object.wait(Native Method)
    >         at
    > org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
    >         ... 7 common frames omitted
    > 2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer -
    > Shutting down SamzaContainer.
    > 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult -
    > Container shutdown timed out after 10000 ms.
    > java.util.concurrent.TimeoutException: Container shutdown timed out after
    > 10000 ms.
    >         at
    > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
    >         at
    > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
    >         at
    > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > 2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An illegal
    > error occurred, forcibly terminating application
    > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
    > java.util.concurrent.TimeoutException: Container shutdown timed out after
    > 10000 ms.
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
    >         at
    > com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30)
    >         at
    > com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74)
    >         at
    > com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103)
    > Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
    > out after 10000 ms.
    >         at
    > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
    >         at
    > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
    >         at
    > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >        at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
    > Shutting down consumer multiplexer.
    > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
    > Shutting down task instance stream tasks.
    > 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
    > Shutting down task thread pool
    > 2020/03/21 21:42:09.902 INFO  c.i.s.sdk.core.SppBaseProcessor -
    > Application finished execution; terminating Cluster.
    > 2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult -
    > Container shutdown timed out after 10000 ms.
    > java.util.concurrent.TimeoutException: Container shutdown timed out after
    > 10000 ms.
    >         at
    > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
    >         at
    > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
    >         at
    > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         at java.lang.Thread.run(Thread.java:748)
    > 2020/03/21 21:42:09.903 INFO  c.i.s.sdk.core.SppBaseProcessor -
    > Application Killer sleeping for 30000 ms
    > Exception in thread "Thread-8"
    > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
    > java.util.concurrent.TimeoutException: Container shutdown timed out after
    > 10000 ms.
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
    >         at
    > org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62)
    >         at
    > com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23)
    >         at java.lang.Thread.run(Thread.java:748)
    > Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
    > out after 10000 ms.
    >         at
    > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
    >         at
    > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
    >         at
    > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
    >         at
    > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
    >         at
    > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
    >         at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    >         at
    > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    >         at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >         at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >         ... 1 more
    > 2020/03/21 21:42:09.907 INFO  o.a.samza.container.SamzaContainer -
    > Shutting down timer executor
    > 2020/03/21 21:42:09.908 INFO  o.a.k.clients.producer.KafkaProducer -
    > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
    >
    


Re: Execution of action: JobModelVersionChange failed.

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms <http://task.shutdown.ms> = 10000*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 30000*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms <http://task.shutdown.ms> = 10000 + x*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 30000 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
<Om...@intuit.com.invalid> wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>         at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>         at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>         at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
> Received exception in debounce timer! Stopping the job coordinator
> java.lang.IllegalStateException: ZkClient already closed!
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>         at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>         at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>         at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator - Job
> Coordinator shutdown is in progress!
> 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught
> exception/error in run loop.
> org.apache.samza.SamzaException: Run loop is interrupted
>         at
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException: null
>         at java.lang.Object.wait(Native Method)
>         at
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
>         ... 7 common frames omitted
> 2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer -
> Shutting down SamzaContainer.
> 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult -
> Container shutdown timed out after 10000 ms.
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An illegal
> error occurred, forcibly terminating application
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>         at
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30)
>         at
> com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74)
>         at
> com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103)
> Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
> out after 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down consumer multiplexer.
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down task instance stream tasks.
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down task thread pool
> 2020/03/21 21:42:09.902 INFO  c.i.s.sdk.core.SppBaseProcessor -
> Application finished execution; terminating Cluster.
> 2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult -
> Container shutdown timed out after 10000 ms.
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.903 INFO  c.i.s.sdk.core.SppBaseProcessor -
> Application Killer sleeping for 30000 ms
> Exception in thread "Thread-8"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62)
>         at
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
> out after 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 more
> 2020/03/21 21:42:09.907 INFO  o.a.samza.container.SamzaContainer -
> Shutting down timer executor
> 2020/03/21 21:42:09.908 INFO  o.a.k.clients.producer.KafkaProducer -
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>