You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Dreyfus <dd...@gmail.com> on 2017/10/26 20:22:29 UTC

Execute multiple jobs in parallel (threading): java.io.OptionalDataException

Hello,

I am trying to submit multiple jobs to flink from within my Java program.
I am running into an exception that may be related:
java.io.OptionalDataException.

Should I be able to create multiple plans/jobs within a single session and
execute them concurrently?
If so, is there a working example you can point me at?

Do I share the same ExecutionEnvironment? 
It looks like calls to getExecutionEnvironment() return the same one.

I have a number of different transformations on my data I'd like to make.
I'd rather not create one very large job and have them processed in
parallel.
My cluster has enough resources that performing each job sequentially would
be very wasteful.

Thank you,
David

Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
	at java.util.HashMap.readObject(HashMap.java:1407)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
	at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
	at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
	at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Execute multiple jobs in parallel (threading): java.io.OptionalDataException

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

I cannot exactly tell how you ended up seeing an OptionalDataException
without seeing your code.

Flink supports to run multiple jobs on the same cluster. That’s what we
call the session mode.

You should not reuse the ExecutionEnvironment because then, you will create
a single job which simply consists of multiple disjunct parts. Calling
ExecutionEnvironment.getExecutionEnvironment will give you a fresh
ExecutionEnvrionment which you can use to submit a new job. Note that you
have to call env.execute in a separate thread because it is a blocking
operation.

Cheers,
Till
​

On Thu, Oct 26, 2017 at 10:22 PM, David Dreyfus <dd...@gmail.com> wrote:

> Hello,
>
> I am trying to submit multiple jobs to flink from within my Java program.
> I am running into an exception that may be related:
> java.io.OptionalDataException.
>
> Should I be able to create multiple plans/jobs within a single session and
> execute them concurrently?
> If so, is there a working example you can point me at?
>
> Do I share the same ExecutionEnvironment?
> It looks like calls to getExecutionEnvironment() return the same one.
>
> I have a number of different transformations on my data I'd like to make.
> I'd rather not create one very large job and have them processed in
> parallel.
> My cluster has enough resources that performing each job sequentially would
> be very wasteful.
>
> Thank you,
> David
>
> Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
> java.io.OptionalDataException
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1588)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at java.util.HashMap.readObject(HashMap.java:1407)
>         at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeReadObject(
> ObjectStreamClass.java:1158)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2173)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at
> org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$
> jobmanager$JobManager$$submitJob(JobManager.scala:1283)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.
> applyOrElse(JobManager.scala:495)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:
> 36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:
> 36)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.
> scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.
> scala:125)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>