You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Simon Su <ba...@163.com> on 2019/10/31 08:00:43 UTC

RemoteEnvironment cannot execute job from local.


Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
                }
            })
.writeAsText("/tmp/file1");

env.execute();
    }

When I run the program, I raises the error like: 


Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f32190552e955bb2048c31930edfb0e)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload my program jar to flink cluster. So can anyone help me for this issue ?


Thanks,
SImon


Re: RemoteEnvironment cannot execute job from local.

Posted by Till Rohrmann <tr...@apache.org>.
No it is the expected behaviour. As I've said, you should give the
createRemoteEnvironment the user code jar of your program. Otherwise Flink
cannot find your filter function. Hence, it works if you comment it out
because it is not needed.

Cheers,
Till

On Thu, Oct 31, 2019 at 11:41 AM Simon Su <ba...@163.com> wrote:

> Hi Till
> Thanks for your reply. Actually I modify the code like this:
> I commented the filter part, and re-run the code, then it works well !!
> The jar passed to createRemoteEnvironment is a udf jar, which does not
> contain my code
> My flink version is 1.9.0, So I’m confused about the actual behaviors of
> ‘createRemoteEnvironment’. is it a potential bugs?
>
>         ExecutionEnvironment env = ExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");
>
>         DataSet<String> data = env.readTextFile("/tmp/file");
>
>         data
> //            .filter(new FilterFunction<String>() {
> //                public boolean filter(String value) {
> //                    return value.startsWith("http://");
> //                }
> //            })
>             .writeAsText("/tmp/file313");
>
>         env.execute();
>
>
> Thanks,
> SImon
>
> On 10/31/2019 17:23,Till Rohrmann<tr...@apache.org>
> <tr...@apache.org> wrote:
>
> In order to run the program on a remote cluster from the IDE you need to
> first build the jar containing your user code. This jar needs to passed
> to createRemoteEnvironment() so that the Flink client knows which jar to
> upload. Hence, please make sure that /tmp/myudf.jar contains your user code.
>
> Cheers,
> Till
>
> On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:
>
>>
>> Hi all
>>    I want to test to submit a job from my local IDE and I deployed a
>> Flink cluster in my vm.
>>    Here is my code from Flink 1.9 document and add some of my parameters.
>>
>>     public static void main(String[] args) throws Exception {
>>         ExecutionEnvironment env = ExecutionEnvironment
>>             .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>>
>>         DataSet<String> data = env.readTextFile("/tmp/file");
>>
>>         data
>>             .filter(new FilterFunction<String>() {
>>                 public boolean filter(String value) {
>>                     return value.startsWith("http://");
>>                 }
>>             })
>>             .writeAsText("/tmp/file1");
>>
>>         env.execute();
>>     }
>>
>> When I run the program, I raises the error like:
>>
>> Exception in thread "main"
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 1f32190552e955bb2048c31930edfb0e)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
>> at
>> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>> at TestMain.main(TestMain.java:25)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 8 more
>> *Caused by: java.lang.RuntimeException: The initialization of the
>> DataSource's outputs caused an error: Could not read the user code wrapper:
>> TestMain$1*
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> *Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not read the user code wrapper: TestMain$1*
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
>> at
>> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
>> at
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
>> ... 3 more
>> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> ... 9 more
>>
>> My understanding is that when using remote environment, I don’t need to
>> upload my program jar to flink cluster. So can anyone help me for this
>> issue ?
>>
>> Thanks,
>> SImon
>>
>>

Re: RemoteEnvironment cannot execute job from local.

Posted by Till Rohrmann <tr...@apache.org>.
No it is the expected behaviour. As I've said, you should give the
createRemoteEnvironment the user code jar of your program. Otherwise Flink
cannot find your filter function. Hence, it works if you comment it out
because it is not needed.

Cheers,
Till

On Thu, Oct 31, 2019 at 11:41 AM Simon Su <ba...@163.com> wrote:

> Hi Till
> Thanks for your reply. Actually I modify the code like this:
> I commented the filter part, and re-run the code, then it works well !!
> The jar passed to createRemoteEnvironment is a udf jar, which does not
> contain my code
> My flink version is 1.9.0, So I’m confused about the actual behaviors of
> ‘createRemoteEnvironment’. is it a potential bugs?
>
>         ExecutionEnvironment env = ExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");
>
>         DataSet<String> data = env.readTextFile("/tmp/file");
>
>         data
> //            .filter(new FilterFunction<String>() {
> //                public boolean filter(String value) {
> //                    return value.startsWith("http://");
> //                }
> //            })
>             .writeAsText("/tmp/file313");
>
>         env.execute();
>
>
> Thanks,
> SImon
>
> On 10/31/2019 17:23,Till Rohrmann<tr...@apache.org>
> <tr...@apache.org> wrote:
>
> In order to run the program on a remote cluster from the IDE you need to
> first build the jar containing your user code. This jar needs to passed
> to createRemoteEnvironment() so that the Flink client knows which jar to
> upload. Hence, please make sure that /tmp/myudf.jar contains your user code.
>
> Cheers,
> Till
>
> On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:
>
>>
>> Hi all
>>    I want to test to submit a job from my local IDE and I deployed a
>> Flink cluster in my vm.
>>    Here is my code from Flink 1.9 document and add some of my parameters.
>>
>>     public static void main(String[] args) throws Exception {
>>         ExecutionEnvironment env = ExecutionEnvironment
>>             .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>>
>>         DataSet<String> data = env.readTextFile("/tmp/file");
>>
>>         data
>>             .filter(new FilterFunction<String>() {
>>                 public boolean filter(String value) {
>>                     return value.startsWith("http://");
>>                 }
>>             })
>>             .writeAsText("/tmp/file1");
>>
>>         env.execute();
>>     }
>>
>> When I run the program, I raises the error like:
>>
>> Exception in thread "main"
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 1f32190552e955bb2048c31930edfb0e)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
>> at
>> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>> at TestMain.main(TestMain.java:25)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 8 more
>> *Caused by: java.lang.RuntimeException: The initialization of the
>> DataSource's outputs caused an error: Could not read the user code wrapper:
>> TestMain$1*
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> *Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not read the user code wrapper: TestMain$1*
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
>> at
>> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
>> at
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
>> ... 3 more
>> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> ... 9 more
>>
>> My understanding is that when using remote environment, I don’t need to
>> upload my program jar to flink cluster. So can anyone help me for this
>> issue ?
>>
>> Thanks,
>> SImon
>>
>>

Re: RemoteEnvironment cannot execute job from local.

Posted by Simon Su <ba...@163.com>.
Hi Till 
Thanks for your reply. Actually I modify the code like this:
I commented the filter part, and re-run the code, then it works well !!  The jar passed to createRemoteEnvironment is a udf jar, which does not contain my code 
My flink version is 1.9.0, So I’m confused about the actual behaviors of ‘createRemoteEnvironment’. is it a potential bugs? 


ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
//            .filter(new FilterFunction<String>() {
//                public boolean filter(String value) {
//                    return value.startsWith("http://");
//                }
//            })
.writeAsText("/tmp/file313");

env.execute();


Thanks,
SImon


On 10/31/2019 17:23,Till Rohrmann<tr...@apache.org> wrote:
In order to run the program on a remote cluster from the IDE you need to first build the jar containing your user code. This jar needs to passed to createRemoteEnvironment() so that the Flink client knows which jar to upload. Hence, please make sure that /tmp/myudf.jar contains your user code.


Cheers,
Till


On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:



Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
                }
            })
.writeAsText("/tmp/file1");

env.execute();
    }

When I run the program, I raises the error like: 


Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f32190552e955bb2048c31930edfb0e)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload my program jar to flink cluster. So can anyone help me for this issue ?


Thanks,
SImon


Re: RemoteEnvironment cannot execute job from local.

Posted by Simon Su <ba...@163.com>.
Hi Till 
Thanks for your reply. Actually I modify the code like this:
I commented the filter part, and re-run the code, then it works well !!  The jar passed to createRemoteEnvironment is a udf jar, which does not contain my code 
My flink version is 1.9.0, So I’m confused about the actual behaviors of ‘createRemoteEnvironment’. is it a potential bugs? 


ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
//            .filter(new FilterFunction<String>() {
//                public boolean filter(String value) {
//                    return value.startsWith("http://");
//                }
//            })
.writeAsText("/tmp/file313");

env.execute();


Thanks,
SImon


On 10/31/2019 17:23,Till Rohrmann<tr...@apache.org> wrote:
In order to run the program on a remote cluster from the IDE you need to first build the jar containing your user code. This jar needs to passed to createRemoteEnvironment() so that the Flink client knows which jar to upload. Hence, please make sure that /tmp/myudf.jar contains your user code.


Cheers,
Till


On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:



Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
                }
            })
.writeAsText("/tmp/file1");

env.execute();
    }

When I run the program, I raises the error like: 


Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f32190552e955bb2048c31930edfb0e)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: TestMain$1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload my program jar to flink cluster. So can anyone help me for this issue ?


Thanks,
SImon


Re: RemoteEnvironment cannot execute job from local.

Posted by Till Rohrmann <tr...@apache.org>.
In order to run the program on a remote cluster from the IDE you need to
first build the jar containing your user code. This jar needs to passed
to createRemoteEnvironment() so that the Flink client knows which jar to
upload. Hence, please make sure that /tmp/myudf.jar contains your user code.

Cheers,
Till

On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:

>
> Hi all
>    I want to test to submit a job from my local IDE and I deployed a Flink
> cluster in my vm.
>    Here is my code from Flink 1.9 document and add some of my parameters.
>
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>
>         DataSet<String> data = env.readTextFile("/tmp/file");
>
>         data
>             .filter(new FilterFunction<String>() {
>                 public boolean filter(String value) {
>                     return value.startsWith("http://");
>                 }
>             })
>             .writeAsText("/tmp/file1");
>
>         env.execute();
>     }
>
> When I run the program, I raises the error like:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f32190552e955bb2048c31930edfb0e)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at TestMain.main(TestMain.java:25)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 8 more
> *Caused by: java.lang.RuntimeException: The initialization of the
> DataSource's outputs caused an error: Could not read the user code wrapper:
> TestMain$1*
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: TestMain$1*
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
> at
> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
> at
> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
> ... 3 more
> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> ... 9 more
>
> My understanding is that when using remote environment, I don’t need to
> upload my program jar to flink cluster. So can anyone help me for this
> issue ?
>
> Thanks,
> SImon
>
>

Re: RemoteEnvironment cannot execute job from local.

Posted by Till Rohrmann <tr...@apache.org>.
In order to run the program on a remote cluster from the IDE you need to
first build the jar containing your user code. This jar needs to passed
to createRemoteEnvironment() so that the Flink client knows which jar to
upload. Hence, please make sure that /tmp/myudf.jar contains your user code.

Cheers,
Till

On Thu, Oct 31, 2019 at 9:01 AM Simon Su <ba...@163.com> wrote:

>
> Hi all
>    I want to test to submit a job from my local IDE and I deployed a Flink
> cluster in my vm.
>    Here is my code from Flink 1.9 document and add some of my parameters.
>
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>
>         DataSet<String> data = env.readTextFile("/tmp/file");
>
>         data
>             .filter(new FilterFunction<String>() {
>                 public boolean filter(String value) {
>                     return value.startsWith("http://");
>                 }
>             })
>             .writeAsText("/tmp/file1");
>
>         env.execute();
>     }
>
> When I run the program, I raises the error like:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f32190552e955bb2048c31930edfb0e)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at TestMain.main(TestMain.java:25)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 8 more
> *Caused by: java.lang.RuntimeException: The initialization of the
> DataSource's outputs caused an error: Could not read the user code wrapper:
> TestMain$1*
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: TestMain$1*
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
> at
> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
> at
> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
> ... 3 more
> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> ... 9 more
>
> My understanding is that when using remote environment, I don’t need to
> upload my program jar to flink cluster. So can anyone help me for this
> issue ?
>
> Thanks,
> SImon
>
>