You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Thad Truman <tt...@neovest.com> on 2018/11/06 16:37:48 UTC

Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Hi all,

We are trying to configure checkpointing (RocksDb) for flink job clusters in k8s.  As described here<https://github.com/apache/flink/tree/release-1.6/flink-container/kubernetes> we have a parallelism value that is used as the -Dparallelism.default arg in the job manager template<https://github.com/apache/flink/blob/release-1.6/flink-container/kubernetes/job-cluster-job.yaml.template> as well as the replicas value in the task manager template<https://github.com/apache/flink/blob/release-1.6/flink-container/kubernetes/task-manager-deployment.yaml.template>.  For jobs where the parallelism value is set to 1 checkpointing works great.  But when we set the parallelism value to anything > 1 we get the below error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 4, slots allocated: 1
                at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
                at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
                at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
                at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
                at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
                at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
                at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
                at akka.dispatch.OnComplete.internal(Future.scala:258)
                at akka.dispatch.OnComplete.internal(Future.scala:256)
                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
                at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
                at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
                at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
                at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
                at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
                at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
                at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
                at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
                at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
                at java.lang.Thread.run(Thread.java:748)


Any ideas on how we can remediate this?

Thanks,

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttruman@neovest.com<ma...@neovest.com>


Support Desk: support@neovest.com<ma...@neovest.com> / +1 800 433 4276



[Alt logo for white backgrounds (Grey Flat)2]

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html<http://www.neovest.com/disclosures.html>





RE: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Posted by Thad Truman <tt...@neovest.com>.
Upgrading to Flink 1.6.2 from 1.6.0 appears to fix this, after making sure each job is writing checkpoints to a unique directory since the jobid's all match.

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttruman@neovest.com<ma...@neovest.com>


Support Desk: support@neovest.com<ma...@neovest.com> / +1 800 433 4276



[Alt logo for white backgrounds (Grey Flat)2]

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html<http://www.neovest.com/disclosures.html>




From: Thad Truman
Sent: Tuesday, November 6, 2018 9:38 AM
To: user@flink.apache.org
Subject: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Hi all,

We are trying to configure checkpointing (RocksDb) for flink job clusters in k8s.  As described here<https://github.com/apache/flink/tree/release-1.6/flink-container/kubernetes> we have a parallelism value that is used as the -Dparallelism.default arg in the job manager template<https://github.com/apache/flink/blob/release-1.6/flink-container/kubernetes/job-cluster-job.yaml.template> as well as the replicas value in the task manager template<https://github.com/apache/flink/blob/release-1.6/flink-container/kubernetes/task-manager-deployment.yaml.template>.  For jobs where the parallelism value is set to 1 checkpointing works great.  But when we set the parallelism value to anything > 1 we get the below error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 4, slots allocated: 1
                at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
                at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
                at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
                at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
                at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
                at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
                at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
                at akka.dispatch.OnComplete.internal(Future.scala:258)
                at akka.dispatch.OnComplete.internal(Future.scala:256)
                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
                at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
                at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
                at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
                at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
                at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
                at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
                at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
                at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
                at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
                at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
                at java.lang.Thread.run(Thread.java:748)


Any ideas on how we can remediate this?

Thanks,

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttruman@neovest.com<ma...@neovest.com>


Support Desk: support@neovest.com<ma...@neovest.com> / +1 800 433 4276



[Alt logo for white backgrounds (Grey Flat)2]

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html<http://www.neovest.com/disclosures.html>