You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Foster, Craig" <fo...@amazon.com> on 2016/08/23 17:54:42 UTC

WordCount w/ YARN and EMR local filesystem and/or HDFS

I'm trying to use the wordcount example with the local file system, but it's giving me permissions error or it's not finding it. It works just fine for input and output on S3. What is the correct URI usage for the local file system and HDFS?

I have installed Flink on EMR and am just using the flink run script to start the job:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///home/hadoop/LICENSE.txt

<snip>

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:108)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Shutting down YARN cluster

% ls -al LICENSE.txt
-rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt

Are there really permissions issues and if so how would I correct that (since permissions are ostensibly correct for any other application)?

For HDFS, I tried this but got a protobuf exception:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com":50070;
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster

Thanks,
Craig



Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by "Foster, Craig" <fo...@amazon.com>.
Oops! That makes sense not to use the web UI port.

Thanks!
Craig


From: Robert Metzger <rm...@apache.org>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Tuesday, August 23, 2016 at 1:59 PM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Hi,

the problem is that you are using the wrong Namenode port. The port is 8020, not 50070.
On EMR, you actually don't need to specify the Namenode port at all.

This command works for me:

[hadoop@ip-172-31-23-104 ~]$ flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs:///user/hadoop/test10Mb.db --output hdfs:///user/hadoop/out

I hope that does the trick.

Let me know if you need further help.

Regards,
Robert


On Tue, Aug 23, 2016 at 8:57 PM, Foster, Craig <fo...@amazon.com>> wrote:
I am 99% they are the same since this exists in the EMR yum repo on the cluster.



From: Stephan Ewen <se...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, August 23, 2016 at 11:47 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>, Robert Metzger <rm...@data-artisans.com>>

Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

I would have to pull in Robert into the loop, but my first guess is that this is a Hadoop version mismatch.

Can you double check that the Hadoop Version for which you download Flink is the same as the one on the cluster?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig <fo...@amazon.com>> wrote:
Ah cool, that makes sense then. So you must always stage your files in a HDFS, S3, etc. I can live with that ;)

Then here is my HDFS exception...maybe I've got to configure something differently? I think I am using a proper HDFS URI.
For HDFS, I tried this but got a protobuf exception:
% hadoop fs -ls /user/hadoop/
Found 2 items
drwxrwxrwx   - hadoop hadoop          0 2016-08-23 17:49 /user/hadoop/.flink
-rw-r--r--   1 hadoop hadoop      15419 2016-08-23 14:56 /user/hadoop/LICENSE.txt

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net/>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster




From: Stephan Ewen <se...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, August 23, 2016 at 11:35 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Hi!

The file "/home/hadoop/LICENSE.txt" probably exists only on the machine that starts the job (your workstation, laptop), not in the cluster. The Flink processes in the cluster cannot find the file under that address.

The input data must be in a filesystem that all cluster nodes can access, like s3, hdfs, a mounted nfs, ...

Stephan


On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com>> wrote:
I'm trying to use the wordcount example with the local file system, but it's giving me permissions error or it's not finding it. It works just fine for input and output on S3. What is the correct URI usage for the local file system and HDFS?

I have installed Flink on EMR and am just using the flink run script to start the job:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///home/hadoop/LICENSE.txt

<snip>

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:108)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Shutting down YARN cluster

% ls -al LICENSE.txt
-rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt

Are there really permissions issues and if so how would I correct that (since permissions are ostensibly correct for any other application)?

For HDFS, I tried this but got a protobuf exception:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster

Thanks,
Craig






Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by Robert Metzger <rm...@apache.org>.
Hi,

the problem is that you are using the wrong Namenode port. The port
is 8020, not 50070.
On EMR, you actually don't need to specify the Namenode port at all.

This command works for me:

[hadoop@ip-172-31-23-104 ~]$ flink run -m yarn-cluster -yn 2
/usr/lib/flink/examples/streaming/WordCount.jar --input
hdfs:///user/hadoop/test10Mb.db --output hdfs:///user/hadoop/out

I hope that does the trick.

Let me know if you need further help.

Regards,
Robert


On Tue, Aug 23, 2016 at 8:57 PM, Foster, Craig <fo...@amazon.com> wrote:

> I am 99% they are the same since this exists in the EMR yum repo on the
> cluster.
>
>
>
>
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Reply-To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Date: *Tuesday, August 23, 2016 at 11:47 AM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>, Robert Metzger <
> rmetzger@data-artisans.com>
>
> *Subject: *Re: WordCount w/ YARN and EMR local filesystem and/or HDFS
>
>
>
> I would have to pull in Robert into the loop, but my first guess is that
> this is a Hadoop version mismatch.
>
>
>
> Can you double check that the Hadoop Version for which you download Flink
> is the same as the one on the cluster?
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig <fo...@amazon.com>
> wrote:
>
> Ah cool, that makes sense then. So you must always stage your files in a
> HDFS, S3, etc. I can live with that ;)
>
>
>
> Then here is my HDFS exception...maybe I've got to configure something
> differently? I think I am using a proper HDFS URI.
>
> For HDFS, I tried this but got a protobuf exception:
>
> % hadoop fs -ls /user/hadoop/
>
> Found 2 items
>
> drwxrwxrwx   - hadoop hadoop          0 2016-08-23 17:49
> /user/hadoop/.flink
>
> -rw-r--r--   1 hadoop hadoop      15419 2016-08-23 14:56
> /user/hadoop/LICENSE.txt
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> <snip>
>
> ------------------------------------------------------------
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Failed on local exception: java.io.IOException:
> org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
>
>     at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
>
>     at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1305)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1301)
>
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> DistributedFileSystem.java:1301)
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(
> HadoopFileSystem.java:351)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Caused by: java.io.IOException: org.apache.flink.hadoop.
> shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.
>
>     at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.
> handleSaslConnectionFailure(Client.java:650)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:737)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>
>     ... 47 more
>
> Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException.invalidEndTag(
> InvalidProtocolBufferException.java:94)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.
> parsePartialFrom(AbstractParser.java:143)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:176)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:188)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:193)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:49)
>
>     at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$
> RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:441)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.
> readFields(ProtobufRpcEngine.java:337)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> readFields(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.security.SaslRpcClient.saslConnect(
> SaslRpcClient.java:370)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.
> java:560)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:724)
>
>     ... 50 more
>
> Shutting down YARN cluster
>
>
>
>
>
>
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Reply-To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Date: *Tuesday, August 23, 2016 at 11:35 AM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: WordCount w/ YARN and EMR local filesystem and/or HDFS
>
>
>
> Hi!
>
>
>
> The file "/home/hadoop/LICENSE.txt" probably exists only on the machine
> that starts the job (your workstation, laptop), not in the cluster. The
> Flink processes in the cluster cannot find the file under that address.
>
>
>
> The input data must be in a filesystem that all cluster nodes can access,
> like s3, hdfs, a mounted nfs, ...
>
>
>
> Stephan
>
>
>
>
>
> On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com>
> wrote:
>
> I'm trying to use the wordcount example with the local file system, but
> it's giving me permissions error or it's not finding it. It works just fine
> for input and output on S3. What is the correct URI usage for the local
> file system and HDFS?
>
>
>
> I have installed Flink on EMR and am just using the flink run script to
> start the job:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input file:///home/hadoop/LICENSE.txt
>
>
>
> <snip>
>
>
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist
> or the user running Flink ('yarn') has insufficient permissions to access
> it.
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.FileNotFoundException: File
> file:/home/hadoop/LICENSE.txt does not exist or the user running Flink
> ('yarn') has insufficient permissions to access it.
>
>     at org.apache.flink.core.fs.local.LocalFileSystem.
> getFileStatus(LocalFileSystem.java:108)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Shutting down YARN cluster
>
>
>
> % ls -al LICENSE.txt
>
> -rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt
>
>
>
> Are there really permissions issues and if so how would I correct that
> (since permissions are ostensibly correct for any other application)?
>
>
>
> For HDFS, I tried this but got a protobuf exception:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> <snip>
>
> ------------------------------------------------------------
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Failed on local exception: java.io.IOException:
> org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
>
>     at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
>
>     at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1305)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1301)
>
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> DistributedFileSystem.java:1301)
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(
> HadoopFileSystem.java:351)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Caused by: java.io.IOException: org.apache.flink.hadoop.
> shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.
>
>     at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.
> handleSaslConnectionFailure(Client.java:650)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:737)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>
>     ... 47 more
>
> Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException.invalidEndTag(
> InvalidProtocolBufferException.java:94)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.
> parsePartialFrom(AbstractParser.java:143)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:176)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:188)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:193)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:49)
>
>     at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$
> RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:441)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.
> readFields(ProtobufRpcEngine.java:337)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> readFields(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.security.SaslRpcClient.saslConnect(
> SaslRpcClient.java:370)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.
> java:560)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:724)
>
>     ... 50 more
>
> Shutting down YARN cluster
>
>
>
> Thanks,
>
> Craig
>
>
>
>
>
>
>
>
>

Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by "Foster, Craig" <fo...@amazon.com>.
I am 99% they are the same since this exists in the EMR yum repo on the cluster.



From: Stephan Ewen <se...@apache.org>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Tuesday, August 23, 2016 at 11:47 AM
To: "user@flink.apache.org" <us...@flink.apache.org>, Robert Metzger <rm...@data-artisans.com>
Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

I would have to pull in Robert into the loop, but my first guess is that this is a Hadoop version mismatch.

Can you double check that the Hadoop Version for which you download Flink is the same as the one on the cluster?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig <fo...@amazon.com>> wrote:
Ah cool, that makes sense then. So you must always stage your files in a HDFS, S3, etc. I can live with that ;)

Then here is my HDFS exception...maybe I've got to configure something differently? I think I am using a proper HDFS URI.
For HDFS, I tried this but got a protobuf exception:
% hadoop fs -ls /user/hadoop/
Found 2 items
drwxrwxrwx   - hadoop hadoop          0 2016-08-23 17:49 /user/hadoop/.flink
-rw-r--r--   1 hadoop hadoop      15419 2016-08-23 14:56 /user/hadoop/LICENSE.txt

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net/>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster




From: Stephan Ewen <se...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, August 23, 2016 at 11:35 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Hi!

The file "/home/hadoop/LICENSE.txt" probably exists only on the machine that starts the job (your workstation, laptop), not in the cluster. The Flink processes in the cluster cannot find the file under that address.

The input data must be in a filesystem that all cluster nodes can access, like s3, hdfs, a mounted nfs, ...

Stephan


On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com>> wrote:
I'm trying to use the wordcount example with the local file system, but it's giving me permissions error or it's not finding it. It works just fine for input and output on S3. What is the correct URI usage for the local file system and HDFS?

I have installed Flink on EMR and am just using the flink run script to start the job:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///home/hadoop/LICENSE.txt

<snip>

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:108)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Shutting down YARN cluster

% ls -al LICENSE.txt
-rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt

Are there really permissions issues and if so how would I correct that (since permissions are ostensibly correct for any other application)?

For HDFS, I tried this but got a protobuf exception:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster

Thanks,
Craig





Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by Stephan Ewen <se...@apache.org>.
I would have to pull in Robert into the loop, but my first guess is that
this is a Hadoop version mismatch.

Can you double check that the Hadoop Version for which you download Flink
is the same as the one on the cluster?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig <fo...@amazon.com> wrote:

> Ah cool, that makes sense then. So you must always stage your files in a
> HDFS, S3, etc. I can live with that ;)
>
>
>
> Then here is my HDFS exception...maybe I've got to configure something
> differently? I think I am using a proper HDFS URI.
>
> For HDFS, I tried this but got a protobuf exception:
>
> % hadoop fs -ls /user/hadoop/
>
> Found 2 items
>
> drwxrwxrwx   - hadoop hadoop          0 2016-08-23 17:49
> /user/hadoop/.flink
>
> -rw-r--r--   1 hadoop hadoop      15419 2016-08-23 14:56
> /user/hadoop/LICENSE.txt
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> <snip>
>
> ------------------------------------------------------------
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Failed on local exception: java.io.IOException:
> org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
>
>     at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
>
>     at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1305)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1301)
>
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> DistributedFileSystem.java:1301)
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(
> HadoopFileSystem.java:351)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Caused by: java.io.IOException: org.apache.flink.hadoop.
> shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.
>
>     at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.
> handleSaslConnectionFailure(Client.java:650)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:737)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>
>     ... 47 more
>
> Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException.invalidEndTag(
> InvalidProtocolBufferException.java:94)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.
> parsePartialFrom(AbstractParser.java:143)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:176)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:188)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:193)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:49)
>
>     at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$
> RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:441)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.
> readFields(ProtobufRpcEngine.java:337)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> readFields(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.security.SaslRpcClient.saslConnect(
> SaslRpcClient.java:370)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.
> java:560)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:724)
>
>     ... 50 more
>
> Shutting down YARN cluster
>
>
>
>
>
>
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Reply-To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Date: *Tuesday, August 23, 2016 at 11:35 AM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: WordCount w/ YARN and EMR local filesystem and/or HDFS
>
>
>
> Hi!
>
>
>
> The file "/home/hadoop/LICENSE.txt" probably exists only on the machine
> that starts the job (your workstation, laptop), not in the cluster. The
> Flink processes in the cluster cannot find the file under that address.
>
>
>
> The input data must be in a filesystem that all cluster nodes can access,
> like s3, hdfs, a mounted nfs, ...
>
>
>
> Stephan
>
>
>
>
>
> On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com>
> wrote:
>
> I'm trying to use the wordcount example with the local file system, but
> it's giving me permissions error or it's not finding it. It works just fine
> for input and output on S3. What is the correct URI usage for the local
> file system and HDFS?
>
>
>
> I have installed Flink on EMR and am just using the flink run script to
> start the job:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input file:///home/hadoop/LICENSE.txt
>
>
>
> <snip>
>
>
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist
> or the user running Flink ('yarn') has insufficient permissions to access
> it.
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.FileNotFoundException: File
> file:/home/hadoop/LICENSE.txt does not exist or the user running Flink
> ('yarn') has insufficient permissions to access it.
>
>     at org.apache.flink.core.fs.local.LocalFileSystem.
> getFileStatus(LocalFileSystem.java:108)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Shutting down YARN cluster
>
>
>
> % ls -al LICENSE.txt
>
> -rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt
>
>
>
> Are there really permissions issues and if so how would I correct that
> (since permissions are ostensibly correct for any other application)?
>
>
>
> For HDFS, I tried this but got a protobuf exception:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> <snip>
>
> ------------------------------------------------------------
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Failed on local exception: java.io.IOException:
> org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
>
>     at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
>
>     at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1305)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1301)
>
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> DistributedFileSystem.java:1301)
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(
> HadoopFileSystem.java:351)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Caused by: java.io.IOException: org.apache.flink.hadoop.
> shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.
>
>     at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.
> handleSaslConnectionFailure(Client.java:650)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:737)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>
>     ... 47 more
>
> Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException.invalidEndTag(
> InvalidProtocolBufferException.java:94)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.
> parsePartialFrom(AbstractParser.java:143)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:176)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:188)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:193)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:49)
>
>     at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$
> RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:441)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.
> readFields(ProtobufRpcEngine.java:337)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> readFields(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.security.SaslRpcClient.saslConnect(
> SaslRpcClient.java:370)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.
> java:560)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:724)
>
>     ... 50 more
>
> Shutting down YARN cluster
>
>
>
> Thanks,
>
> Craig
>
>
>
>
>
>
>

Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by "Foster, Craig" <fo...@amazon.com>.
Ah cool, that makes sense then. So you must always stage your files in a HDFS, S3, etc. I can live with that ;)

Then here is my HDFS exception...maybe I've got to configure something differently? I think I am using a proper HDFS URI.
For HDFS, I tried this but got a protobuf exception:
% hadoop fs -ls /user/hadoop/
Found 2 items
drwxrwxrwx   - hadoop hadoop          0 2016-08-23 17:49 /user/hadoop/.flink
-rw-r--r--   1 hadoop hadoop      15419 2016-08-23 14:56 /user/hadoop/LICENSE.txt

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253/>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com/>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net/>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster




From: Stephan Ewen <se...@apache.org>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Tuesday, August 23, 2016 at 11:35 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Hi!

The file "/home/hadoop/LICENSE.txt" probably exists only on the machine that starts the job (your workstation, laptop), not in the cluster. The Flink processes in the cluster cannot find the file under that address.

The input data must be in a filesystem that all cluster nodes can access, like s3, hdfs, a mounted nfs, ...

Stephan


On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com>> wrote:
I'm trying to use the wordcount example with the local file system, but it's giving me permissions error or it's not finding it. It works just fine for input and output on S3. What is the correct URI usage for the local file system and HDFS?

I have installed Flink on EMR and am just using the flink run script to start the job:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///home/hadoop/LICENSE.txt

<snip>

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:108)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Shutting down YARN cluster

% ls -al LICENSE.txt
-rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt

Are there really permissions issues and if so how would I correct that (since permissions are ostensibly correct for any other application)?

For HDFS, I tried this but got a protobuf exception:

% flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt<http://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt>

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-31-23-253/172.31.23.253<http://172.31.23.253>"; destination host is: "ec2-54-208-188-194.compute-1.amazonaws.com<http://ec2-54-208-188-194.compute-1.amazonaws.com>":50070;
    at org.apache.hadoop.net<http://org.apache.hadoop.net>.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
    at org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster

Thanks,
Craig




Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

Posted by Stephan Ewen <se...@apache.org>.
Hi!

The file "/home/hadoop/LICENSE.txt" probably exists only on the machine
that starts the job (your workstation, laptop), not in the cluster. The
Flink processes in the cluster cannot find the file under that address.

The input data must be in a filesystem that all cluster nodes can access,
like s3, hdfs, a mounted nfs, ...

Stephan


On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig <fo...@amazon.com> wrote:

> I'm trying to use the wordcount example with the local file system, but
> it's giving me permissions error or it's not finding it. It works just fine
> for input and output on S3. What is the correct URI usage for the local
> file system and HDFS?
>
>
>
> I have installed Flink on EMR and am just using the flink run script to
> start the job:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input file:///home/hadoop/LICENSE.txt
>
>
>
> <snip>
>
>
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: File file:/home/hadoop/LICENSE.txt does not exist
> or the user running Flink ('yarn') has insufficient permissions to access
> it.
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.FileNotFoundException: File
> file:/home/hadoop/LICENSE.txt does not exist or the user running Flink
> ('yarn') has insufficient permissions to access it.
>
>     at org.apache.flink.core.fs.local.LocalFileSystem.
> getFileStatus(LocalFileSystem.java:108)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Shutting down YARN cluster
>
>
>
> % ls -al LICENSE.txt
>
> -rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt
>
>
>
> Are there really permissions issues and if so how would I correct that
> (since permissions are ostensibly correct for any other application)?
>
>
>
> For HDFS, I tried this but got a protobuf exception:
>
>
>
> % flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> <snip>
>
> ------------------------------------------------------------
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
>     at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
>     at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
>     at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
>     at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
>     at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
>     at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>     at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>     at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Failed on local exception: java.io.IOException:
> org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:679)
>
>     at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1026)
>
>     ... 25 more
>
> Caused by: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.; Host Details : local host is: "ip-172-31-23-253/
> 172.31.23.253"; destination host is: "ec2-54-208-188-194.compute-1.
> amazonaws.com":50070;
>
>     at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
>
>     at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
>
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
>
>     at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1305)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem$22.
> doCall(DistributedFileSystem.java:1301)
>
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>
>     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> DistributedFileSystem.java:1301)
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(
> HadoopFileSystem.java:351)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:451)
>
>     at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(
> FileInputFormat.java:57)
>
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
>
>     ... 27 more
>
> Caused by: java.io.IOException: org.apache.flink.hadoop.
> shaded.com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.
>
>     at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.
> handleSaslConnectionFailure(Client.java:650)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:737)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>
>     ... 47 more
>
> Caused by: org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException: Protocol message end-group tag did not
> match expected tag.
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> InvalidProtocolBufferException.invalidEndTag(
> InvalidProtocolBufferException.java:94)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.
> parsePartialFrom(AbstractParser.java:143)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:176)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:188)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:193)
>
>     at org.apache.flink.hadoop.shaded.com.google.protobuf.
> AbstractParser.parseFrom(AbstractParser.java:49)
>
>     at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$
> RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:441)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> parseHeaderFrom(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.
> readFields(ProtobufRpcEngine.java:337)
>
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.
> readFields(ProtobufRpcEngine.java:417)
>
>     at org.apache.hadoop.security.SaslRpcClient.saslConnect(
> SaslRpcClient.java:370)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.
> java:560)
>
>     at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.
> java:375)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
>
>     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
>
>     at java.security.AccessController.doPrivileged(Native Method)
>
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>
>     at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
>
>     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(
> Client.java:724)
>
>     ... 50 more
>
> Shutting down YARN cluster
>
>
>
> Thanks,
>
> Craig
>
>
>
>
>