You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by Trevor Grant <tr...@gmail.com> on 2015/10/27 00:06:08 UTC

Including dependencies in Flink Interpreter

Hey all,

I'm trying specifically to get flinkML a third party package working with
Flink in Zeppelin.

I have been playing with this all day, and think I am getting closer.  Here
is what I have done so far:
Package Flink-ml as a 'fat jar' and placed it in the $FLINK/lib directory.
I can run ML programs from the interactive scala shell in this way (with
out adding the -a argument at the command line, this is in effect always
loading the ML library).

After that when I would try to submit jobs from zeppelin it would get
'org.apache.flink.ml... not found' errors.  So I edited the pom.xml of the
flink that comes prepackaged with zeppelin (specifically I'm using Till's
mirror of zeppelin that supports flink 0.10 SNAPSHOT).  I did this because
I heard there was some internal processing done to a job on the zeppelin
side before submitting to flink.

After that edit I now am getting errors like this:
rg.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed. at
org.apache.flink.client.program.Client.runBlocking(Client.java:370) at
org.apache.flink.client.program.Client.runBlocking(Client.java:348) at
org.apache.flink.client.program.Client.runBlocking(Client.java:315) at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
at
org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:86)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:618)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:545) at
.<init>(<console>:23) at .<clinit>(<console>) at .<init>(<console>:7) at
.<clinit>(<console>) at $print(<console>) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) at
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) at
scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at
scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) at
scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) at
org.apache.zeppelin.flink.FlinkInterpreter$1.apply(FlinkInterpreter.java:272)
at
org.apache.zeppelin.flink.FlinkInterpreter$1.apply(FlinkInterpreter.java:269)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
scala.Console$.withOut(Console.scala:107) at
scala.Console.withOut(Console.scala) at
org.apache.zeppelin.flink.FlinkInterpreter.interpret(FlinkInterpreter.java:267)
at
org.apache.zeppelin.flink.FlinkInterpreter.interpret(FlinkInterpreter.java:234)
at
org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed. at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the UDF: Could not read
the user code wrapper: org.apache.flink.ml.optimization.IterativeSolver;
local class incompatible: stream classdesc serialVersionUID =
-7643089809976475406, local class serialVersionUID = -6684968557060820498
at
org.apache.flink.runtime.operators.BatchTask.initialize(BatchTask.java:415)
at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.initialize(AbstractIterativeTask.java:94)
at
org.apache.flink.runtime.iterative.task.IterationTailTask.initialize(IterationTailTask.java:52)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at
java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not read the user code wrapper:
org.apache.flink.ml.optimization.IterativeSolver; local class incompatible:
stream classdesc serialVersionUID = -7643089809976475406, local class
serialVersionUID = -6684968557060820498 at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:286)
at
org.apache.flink.runtime.operators.BatchTask.initStub(BatchTask.java:630)
at
org.apache.flink.runtime.operators.BatchTask.initialize(BatchTask.java:412)
... 5 more Caused by: java.io.InvalidClassException:
org.apache.flink.ml.optimization.IterativeSolver; local class incompatible:
stream classdesc serialVersionUID = -7643089809976475406, local class
serialVersionUID = -6684968557060820498 at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
... 7 more


Which i over all interpret to be an issue of mismatched versions.

I am writing to you, Zeppelin mailing list for direction on my quest.

I think I am fundamentally misunderstanding the way zeppelin is submitting
jobs to my flink cluster (which can and does easily support flink-ml
tasks).

It's also possible I have gone the wrong direction all together and there
is some much more elegant way of adding 3rd party dependencies to the flink
interpreter.

Any advice is welcome, and thank you!

tg


Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo

*"Fortunate is he, who is able to know the causes of things."  -Virgil*

Re: Including dependencies in Flink Interpreter

Posted by moon soo Lee <mo...@apache.org>.
Hi,

Here's related code in Zeppelin side.

https://github.com/apache/incubator-zeppelin/blob/master/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java#L102

Argument placed in 3rd position is external jars. currently it's passing
null.
If user can set external jars in flink settings in 'Interpreter' menu,
would it be useful for you?

Thanks,
moon


On Tue, Oct 27, 2015 at 8:06 AM Trevor Grant <tr...@gmail.com>
wrote:

> Hey all,
>
> I'm trying specifically to get flinkML a third party package working with
> Flink in Zeppelin.
>
> I have been playing with this all day, and think I am getting closer.
> Here is what I have done so far:
> Package Flink-ml as a 'fat jar' and placed it in the $FLINK/lib
> directory.  I can run ML programs from the interactive scala shell in this
> way (with out adding the -a argument at the command line, this is in effect
> always loading the ML library).
>
> After that when I would try to submit jobs from zeppelin it would get
> 'org.apache.flink.ml... not found' errors.  So I edited the pom.xml of the
> flink that comes prepackaged with zeppelin (specifically I'm using Till's
> mirror of zeppelin that supports flink 0.10 SNAPSHOT).  I did this because
> I heard there was some internal processing done to a job on the zeppelin
> side before submitting to flink.
>
> After that edit I now am getting errors like this:
> rg.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed. at
> org.apache.flink.client.program.Client.runBlocking(Client.java:370) at
> org.apache.flink.client.program.Client.runBlocking(Client.java:348) at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315) at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
> at
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:86)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:618)
> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:545) at
> .<init>(<console>:23) at .<clinit>(<console>) at .<init>(<console>:7) at
> .<clinit>(<console>) at $print(<console>) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606) at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) at
> org.apache.zeppelin.flink.FlinkInterpreter$1.apply(FlinkInterpreter.java:272)
> at
> org.apache.zeppelin.flink.FlinkInterpreter$1.apply(FlinkInterpreter.java:269)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
> scala.Console$.withOut(Console.scala:107) at
> scala.Console.withOut(Console.scala) at
> org.apache.zeppelin.flink.FlinkInterpreter.interpret(FlinkInterpreter.java:267)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.interpret(FlinkInterpreter.java:234)
> at
> org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
> at
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at
> org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745) Caused by:
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed. at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Initializing the UDF: Could not read
> the user code wrapper: org.apache.flink.ml.optimization.IterativeSolver;
> local class incompatible: stream classdesc serialVersionUID =
> -7643089809976475406, local class serialVersionUID = -6684968557060820498
> at
> org.apache.flink.runtime.operators.BatchTask.initialize(BatchTask.java:415)
> at
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.initialize(AbstractIterativeTask.java:94)
> at
> org.apache.flink.runtime.iterative.task.IterationTailTask.initialize(IterationTailTask.java:52)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at
> java.lang.Thread.run(Thread.java:745) Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper:
> org.apache.flink.ml.optimization.IterativeSolver; local class incompatible:
> stream classdesc serialVersionUID = -7643089809976475406, local class
> serialVersionUID = -6684968557060820498 at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:286)
> at
> org.apache.flink.runtime.operators.BatchTask.initStub(BatchTask.java:630)
> at
> org.apache.flink.runtime.operators.BatchTask.initialize(BatchTask.java:412)
> ... 5 more Caused by: java.io.InvalidClassException:
> org.apache.flink.ml.optimization.IterativeSolver; local class incompatible:
> stream classdesc serialVersionUID = -7643089809976475406, local class
> serialVersionUID = -6684968557060820498 at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
> ... 7 more
>
>
> Which i over all interpret to be an issue of mismatched versions.
>
> I am writing to you, Zeppelin mailing list for direction on my quest.
>
> I think I am fundamentally misunderstanding the way zeppelin is submitting
> jobs to my flink cluster (which can and does easily support flink-ml
> tasks).
>
> It's also possible I have gone the wrong direction all together and there
> is some much more elegant way of adding 3rd party dependencies to the flink
> interpreter.
>
> Any advice is welcome, and thank you!
>
> tg
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>