You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timur Fayruzov <ti...@gmail.com> on 2016/04/04 23:51:11 UTC

Integrate Flink with S3 on EMR cluster

Hello,

I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
Job, unload outputs from HDFS to S3.

However, ideally I'd like to read/write data directly from/to S3. I
followed the instructions here:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
more specifically I:
  1. Modified flink-conf to point to /etc/hadoop/conf
  2. Modified core-site.xml per link above (not clear why why it is not
using IAM, I had to provide AWS keys explicitly).

Run the following command:
HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
-yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
s3://<my key> --output hdfs:///flink-output

First, I see messages like that:
2016-04-04 21:37:10,418 INFO
org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key
'<my key>' for reading at position '333000'

Then, it fails with the following error:

------------------------------------------------------------

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
(WordCount Example)

at org.apache.flink.client.program.Client.runBlocking(Client.java:381)

at org.apache.flink.client.program.Client.runBlocking(Client.java:355)

at org.apache.flink.client.program.Client.runBlocking(Client.java:315)

at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)

at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)

at org.apache.flink.client.program.Client.runBlocking(Client.java:248)

at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)

at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

at akka.dispatch.Mailbox.run(Mailbox.scala:221)

at akka.dispatch.Mailbox.exec(Mailbox.scala:231)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error:
org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)

at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)

... 21 more

Caused by: java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)

at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)

at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)

at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)

at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

... 23 more


Somehow, it's still tries to use EMRFS (which may be a valid thing?), but
it is failing to initialize. I don't know enough about EMRFS/S3 interop so
don't know how diagnose it further.

I run Flink 1.0.0 compiled for Scala 2.11.

Any advice on how to make it work is highly appreciated.


Thanks,

Timur

Re: Integrate Flink with S3 on EMR cluster

Posted by Robert Metzger <rm...@apache.org>.
Hi Vinay,

using the HADOOP_CLASSPATH variable on the client machine is the
recommended way to solve this problem.

I'll update the documentation accordingly.


On Wed, Mar 8, 2017 at 10:26 AM, vinay patil <vi...@gmail.com>
wrote:

> Hi ,
>
> @Shannon - I am not facing any issue while writing to S3, was getting
> NoClassDef errors when reading the file from S3.
>
> ''Hadoop File System" - I mean I am using FileSystem class of Hadoop to
> read
> the file from S3.
>
> @Stephan - I tried with 1.1.4 , was getting the same issue.
>
> The easiest way I found is to run " hadoop classpath " command, and paste
> its value for export HADOOP_CLASSPATH variable.
>
> This way we don't have to copy any S3 specific jars to Flink lib folder.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-
> with-S3-on-EMR-cluster-tp5894p12101.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Integrate Flink with S3 on EMR cluster

Posted by vinay patil <vi...@gmail.com>.
Hi ,

@Shannon - I am not facing any issue while writing to S3, was getting
NoClassDef errors when reading the file from S3.

''Hadoop File System" - I mean I am using FileSystem class of Hadoop to read
the file from S3.

@Stephan - I tried with 1.1.4 , was getting the same issue.

The easiest way I found is to run " hadoop classpath " command, and paste
its value for export HADOOP_CLASSPATH variable.

This way we don't have to copy any S3 specific jars to Flink lib folder.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12101.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Integrate Flink with S3 on EMR cluster

Posted by Stephan Ewen <se...@apache.org>.
You can always explicitly request a broadcast join, via "joinWithTiny",
"joinWithHuge", or by supplying a JoinHint.

Greetings,
Stephan


On Sat, Apr 9, 2016 at 1:56 AM, Timur Fayruzov <ti...@gmail.com>
wrote:

> Thank you Robert. One of my test cases is broadcast join, so I need to
> make statistics work. The only workaround I have found so far is to copy
> the contents of /usr/share/aws/emr/emrfs/lib/, /usr/share/aws/aws-java-sdk/
> and /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar to flink/lib.
> Putting these directories to HADOOP_CLASSPATH unfortunately did not work (I
> am running a single machine cluster, so YARN ResourceManager and
> NodeManager share the same machine). Apparently, classpath for YARN
> containers does not include HADOOP_CLASSPATH.
>
> I'm not a Hadoop expert, so the relationship between YARN, hadoop
> executable and Flink seem strange to me. hadoop executable sets up a lot of
> env vars (including hadoop classpath), but it seems that this setup has no
> effect on YARN application containers. Not sure it is an expected situation.
>
> Thanks,
> Timur
>
> On Fri, Apr 8, 2016 at 2:38 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Timur,
>>
>> the Flink optimizer runs on the client, so the exception is thrown from
>> the JVM running the ./bin/flink client.
>> Since the statistics sampling is an optional step, its surrounded by a
>> try / catch block that just logs the error message.
>>
>> More answers inline below
>>
>>
>> On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <timur.fairuzov@gmail.com
>> > wrote:
>>
>>> The exception does not show up in the console when I run the job, it
>>> only shows in the logs. I thought it means that it happens either on AM or
>>> TM (I assume what I see in stdout is client log). Is my thinking right?
>>>
>>>
>>> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> Hey Timur,
>>>>
>>>> Just had a chat with Robert about this. I agree that the error message
>>>> is confusing, but it is fine it this case. The file system classes are
>>>> not on the class path of the client process, which is submitting the
>>>> job.
>>>
>>> Do you mean that classes should be in the classpath of
>>> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
>>> tried to add EMRFS jars to this classpath but it did not help. BTW, it
>>> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
>>> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
>>> point that I control here to add to classpath, so I had to set it manually.
>>>
>>
>> Yes, they have to be in the classpath of the CliFrontend.
>> The client should also work without the HADOOP_CLASSPATH being set. Its
>> optional for cases where you want to manually add jars to the classpath.
>> For example on Google Compute they set the HADOOP_CLASSPATH.
>>
>> Please note that we are not transferring the contents of the
>> HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
>> the HADOOP_CLASSPATH on all machines.
>> Another approach is just putting the required jar into the "lib/" folder
>> of your Flink installation (the folder is next to "bin/", "conf/", "logs/").
>>
>>
>>>
>>>
>>>> It fails to sample the input file sizes, but this is just an
>>>> optimization step and hence it does not fail the job and only logs the
>>>> error.
>>>>
>>> Is this optimization only for client side? In other words, does it
>>> affect Flink's ability to choose proper type of a join?
>>>
>>
>> Your DataSet program is translated into a generic representation. Then,
>> this representation is passed into the optimizer, which decides on join /
>> sorting / data shipping strategies. The output of the optimizer is sent to
>> the JobManager for execution.
>> If the optimizer is not able to get good statistics about the input (like
>> in your case), it will default to robust execution strategies. I don't know
>> the input sizes of your job and the structure of your job, but chances are
>> high that the final plan is the same with and without the input statistics.
>> Only in cases where one join side is very small the input statistics
>> might be relevant.
>> Other optimizations, such as reusing existing data partitioning or
>> ordering work independent of the input sampling.
>>
>>
>>>
>>>
>>>>
>>>> After the job is submitted everything should run as expected.
>>>>
>>>> You should be able to get rid of that exception by adding the missing
>>>> classes to the class path of the client process (started via
>>>> bin/flink), for example via the lib folder.
>>>>
>>> The above approach did not work, could you elaborate what you meant by
>>> 'lib folder'?
>>>
>>
>> See above.
>>
>>
>>
>>>
>>> Thanks,
>>> Timur
>>>
>>>
>>>> – Ufuk
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <
>>>> timur.fairuzov@gmail.com> wrote:
>>>> > There's one more filesystem integration failure that I have found. My
>>>> job on
>>>> > a toy dataset succeeds, but Flink log contains the following message:
>>>> > 2016-04-07 18:10:01,339 ERROR
>>>> > org.apache.flink.api.common.io.DelimitedInputFormat           -
>>>> Unexpected
>>>> > problen while getting the file statistics for file 's3://...':
>>>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> > java.lang.RuntimeException: java.lang.RuntimeException:
>>>> > java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>>>> >         at
>>>> >
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>>>> >         at
>>>> >
>>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>>>> >         at
>>>> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>>>> >         at
>>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>>>> >         at
>>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>>>> >         at
>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>>> >         at scala.Option.foreach(Option.scala:257)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>>>> >         at
>>>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >         at
>>>> >
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> >         at
>>>> >
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >         at java.lang.reflect.Method.invoke(Method.java:606)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> >         at
>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> >         at
>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> >         at
>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> > Caused by: java.lang.RuntimeException:
>>>> java.lang.ClassNotFoundException:
>>>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>>>> >         ... 37 more
>>>> > Caused by: java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> >
>>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>>> >         ... 38 more
>>>> >
>>>> > I assume this may be a big problem if run on large datasets as there
>>>> will be
>>>> > no information for optimizer. I tried to change EMRFS to NativeS3
>>>> driver,
>>>> > but get the same error, which is surprising. I expected
>>>> NativeS3FileSystem
>>>> > to be in the classpath since it ships with Flink runtime.
>>>> >
>>>> > Thanks,
>>>> > Timur
>>>> >
>>>> >
>>>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:
>>>> >>
>>>> >> Yes, for sure.
>>>> >>
>>>> >> I added some documentation for AWS here:
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>>>> >>
>>>> >> Would be nice to update that page with your pull request. :-)
>>>> >>
>>>> >> – Ufuk
>>>> >>
>>>> >>
>>>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org>
>>>> wrote:
>>>> >> > Hi Timur,
>>>> >> >
>>>> >> > Great! Bootstrap action for Flink is good for AWS users. I think
>>>> the
>>>> >> > bootstrap action scripts would be placed in `flink-contrib`
>>>> directory.
>>>> >> >
>>>> >> > If you want, one of people in PMC of Flink will be assign
>>>> FLINK-1337 to
>>>> >> > you.
>>>> >> >
>>>> >> > Regards,
>>>> >> > Chiwan Park
>>>> >> >
>>>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <
>>>> timur.fairuzov@gmail.com>
>>>> >> >> wrote:
>>>> >> >>
>>>> >> >> I had a guide like that.
>>>> >> >>
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
Thank you Robert. One of my test cases is broadcast join, so I need to make
statistics work. The only workaround I have found so far is to copy the
contents of /usr/share/aws/emr/emrfs/lib/, /usr/share/aws/aws-java-sdk/ and
/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar to flink/lib.
Putting these directories to HADOOP_CLASSPATH unfortunately did not work (I
am running a single machine cluster, so YARN ResourceManager and
NodeManager share the same machine). Apparently, classpath for YARN
containers does not include HADOOP_CLASSPATH.

I'm not a Hadoop expert, so the relationship between YARN, hadoop
executable and Flink seem strange to me. hadoop executable sets up a lot of
env vars (including hadoop classpath), but it seems that this setup has no
effect on YARN application containers. Not sure it is an expected situation.

Thanks,
Timur

On Fri, Apr 8, 2016 at 2:38 AM, Robert Metzger <rm...@apache.org> wrote:

> Hi Timur,
>
> the Flink optimizer runs on the client, so the exception is thrown from
> the JVM running the ./bin/flink client.
> Since the statistics sampling is an optional step, its surrounded by a try
> / catch block that just logs the error message.
>
> More answers inline below
>
>
> On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <ti...@gmail.com>
> wrote:
>
>> The exception does not show up in the console when I run the job, it only
>> shows in the logs. I thought it means that it happens either on AM or TM (I
>> assume what I see in stdout is client log). Is my thinking right?
>>
>>
>> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Hey Timur,
>>>
>>> Just had a chat with Robert about this. I agree that the error message
>>> is confusing, but it is fine it this case. The file system classes are
>>> not on the class path of the client process, which is submitting the
>>> job.
>>
>> Do you mean that classes should be in the classpath of
>> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
>> tried to add EMRFS jars to this classpath but it did not help. BTW, it
>> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
>> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
>> point that I control here to add to classpath, so I had to set it manually.
>>
>
> Yes, they have to be in the classpath of the CliFrontend.
> The client should also work without the HADOOP_CLASSPATH being set. Its
> optional for cases where you want to manually add jars to the classpath.
> For example on Google Compute they set the HADOOP_CLASSPATH.
>
> Please note that we are not transferring the contents of the
> HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
> the HADOOP_CLASSPATH on all machines.
> Another approach is just putting the required jar into the "lib/" folder
> of your Flink installation (the folder is next to "bin/", "conf/", "logs/").
>
>
>>
>>
>>> It fails to sample the input file sizes, but this is just an
>>> optimization step and hence it does not fail the job and only logs the
>>> error.
>>>
>> Is this optimization only for client side? In other words, does it affect
>> Flink's ability to choose proper type of a join?
>>
>
> Your DataSet program is translated into a generic representation. Then,
> this representation is passed into the optimizer, which decides on join /
> sorting / data shipping strategies. The output of the optimizer is sent to
> the JobManager for execution.
> If the optimizer is not able to get good statistics about the input (like
> in your case), it will default to robust execution strategies. I don't know
> the input sizes of your job and the structure of your job, but chances are
> high that the final plan is the same with and without the input statistics.
> Only in cases where one join side is very small the input statistics might
> be relevant.
> Other optimizations, such as reusing existing data partitioning or
> ordering work independent of the input sampling.
>
>
>>
>>
>>>
>>> After the job is submitted everything should run as expected.
>>>
>>> You should be able to get rid of that exception by adding the missing
>>> classes to the class path of the client process (started via
>>> bin/flink), for example via the lib folder.
>>>
>> The above approach did not work, could you elaborate what you meant by
>> 'lib folder'?
>>
>
> See above.
>
>
>
>>
>> Thanks,
>> Timur
>>
>>
>>> – Ufuk
>>>
>>>
>>>
>>>
>>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <ti...@gmail.com>
>>> wrote:
>>> > There's one more filesystem integration failure that I have found. My
>>> job on
>>> > a toy dataset succeeds, but Flink log contains the following message:
>>> > 2016-04-07 18:10:01,339 ERROR
>>> > org.apache.flink.api.common.io.DelimitedInputFormat           -
>>> Unexpected
>>> > problen while getting the file statistics for file 's3://...':
>>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>> > java.lang.RuntimeException: java.lang.RuntimeException:
>>> > java.lang.ClassNotFoundException: Class
>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>> >         at
>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>>> >         at
>>> >
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>>> >         at
>>> >
>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>>> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>>> >         at
>>> >
>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>>> >         at
>>> >
>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>>> >         at
>>> >
>>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>>> >         at
>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>>> >         at
>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>> >         at
>>> >
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>>> >         at
>>> >
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>>> >         at
>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>>> >         at
>>> >
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>> >         at
>>> >
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>> >         at
>>> >
>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>> >         at
>>> >
>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>>> >         at
>>> >
>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>> >         at
>>> >
>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>> >         at scala.Option.foreach(Option.scala:257)
>>> >         at
>>> >
>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>>> >         at
>>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >         at
>>> >
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> >         at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >         at java.lang.reflect.Method.invoke(Method.java:606)
>>> >         at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> >         at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> >         at
>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> >         at
>>> >
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> >         at
>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> >         at
>>> >
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> >         at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> > Caused by: java.lang.RuntimeException:
>>> java.lang.ClassNotFoundException:
>>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>> >         at
>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>>> >         at
>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>>> >         ... 37 more
>>> > Caused by: java.lang.ClassNotFoundException: Class
>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>> >         at
>>> >
>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>>> >         at
>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>> >         ... 38 more
>>> >
>>> > I assume this may be a big problem if run on large datasets as there
>>> will be
>>> > no information for optimizer. I tried to change EMRFS to NativeS3
>>> driver,
>>> > but get the same error, which is surprising. I expected
>>> NativeS3FileSystem
>>> > to be in the classpath since it ships with Flink runtime.
>>> >
>>> > Thanks,
>>> > Timur
>>> >
>>> >
>>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:
>>> >>
>>> >> Yes, for sure.
>>> >>
>>> >> I added some documentation for AWS here:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>>> >>
>>> >> Would be nice to update that page with your pull request. :-)
>>> >>
>>> >> – Ufuk
>>> >>
>>> >>
>>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org>
>>> wrote:
>>> >> > Hi Timur,
>>> >> >
>>> >> > Great! Bootstrap action for Flink is good for AWS users. I think the
>>> >> > bootstrap action scripts would be placed in `flink-contrib`
>>> directory.
>>> >> >
>>> >> > If you want, one of people in PMC of Flink will be assign
>>> FLINK-1337 to
>>> >> > you.
>>> >> >
>>> >> > Regards,
>>> >> > Chiwan Park
>>> >> >
>>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <
>>> timur.fairuzov@gmail.com>
>>> >> >> wrote:
>>> >> >>
>>> >> >> I had a guide like that.
>>> >> >>
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Robert Metzger <rm...@apache.org>.
Hi Timur,

the Flink optimizer runs on the client, so the exception is thrown from the
JVM running the ./bin/flink client.
Since the statistics sampling is an optional step, its surrounded by a try
/ catch block that just logs the error message.

More answers inline below


On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <ti...@gmail.com>
wrote:

> The exception does not show up in the console when I run the job, it only
> shows in the logs. I thought it means that it happens either on AM or TM (I
> assume what I see in stdout is client log). Is my thinking right?
>
>
> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Timur,
>>
>> Just had a chat with Robert about this. I agree that the error message
>> is confusing, but it is fine it this case. The file system classes are
>> not on the class path of the client process, which is submitting the
>> job.
>
> Do you mean that classes should be in the classpath of
> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
> tried to add EMRFS jars to this classpath but it did not help. BTW, it
> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
> point that I control here to add to classpath, so I had to set it manually.
>

Yes, they have to be in the classpath of the CliFrontend.
The client should also work without the HADOOP_CLASSPATH being set. Its
optional for cases where you want to manually add jars to the classpath.
For example on Google Compute they set the HADOOP_CLASSPATH.

Please note that we are not transferring the contents of the
HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
the HADOOP_CLASSPATH on all machines.
Another approach is just putting the required jar into the "lib/" folder of
your Flink installation (the folder is next to "bin/", "conf/", "logs/").


>
>
>> It fails to sample the input file sizes, but this is just an
>> optimization step and hence it does not fail the job and only logs the
>> error.
>>
> Is this optimization only for client side? In other words, does it affect
> Flink's ability to choose proper type of a join?
>

Your DataSet program is translated into a generic representation. Then,
this representation is passed into the optimizer, which decides on join /
sorting / data shipping strategies. The output of the optimizer is sent to
the JobManager for execution.
If the optimizer is not able to get good statistics about the input (like
in your case), it will default to robust execution strategies. I don't know
the input sizes of your job and the structure of your job, but chances are
high that the final plan is the same with and without the input statistics.
Only in cases where one join side is very small the input statistics might
be relevant.
Other optimizations, such as reusing existing data partitioning or ordering
work independent of the input sampling.


>
>
>>
>> After the job is submitted everything should run as expected.
>>
>> You should be able to get rid of that exception by adding the missing
>> classes to the class path of the client process (started via
>> bin/flink), for example via the lib folder.
>>
> The above approach did not work, could you elaborate what you meant by
> 'lib folder'?
>

See above.



>
> Thanks,
> Timur
>
>
>> – Ufuk
>>
>>
>>
>>
>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <ti...@gmail.com>
>> wrote:
>> > There's one more filesystem integration failure that I have found. My
>> job on
>> > a toy dataset succeeds, but Flink log contains the following message:
>> > 2016-04-07 18:10:01,339 ERROR
>> > org.apache.flink.api.common.io.DelimitedInputFormat           -
>> Unexpected
>> > problen while getting the file statistics for file 's3://...':
>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> > java.lang.RuntimeException: java.lang.RuntimeException:
>> > java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>> >         at
>> >
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>> >         at
>> >
>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>> >         at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>> >         at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>> >         at
>> >
>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>> >         at
>> >
>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>> >         at
>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>> >         at
>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> >         at
>> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>> >         at
>> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>> >         at
>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>> >         at
>> >
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >         at
>> >
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>> >         at
>> >
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>> >         at scala.Option.foreach(Option.scala:257)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>> >         at
>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >         at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >         at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >         at java.lang.reflect.Method.invoke(Method.java:606)
>> >         at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >         at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >         at
>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >         at
>> >
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >         at
>> >
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >         at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> > Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>> >         ... 37 more
>> > Caused by: java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> >
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>> >         ... 38 more
>> >
>> > I assume this may be a big problem if run on large datasets as there
>> will be
>> > no information for optimizer. I tried to change EMRFS to NativeS3
>> driver,
>> > but get the same error, which is surprising. I expected
>> NativeS3FileSystem
>> > to be in the classpath since it ships with Flink runtime.
>> >
>> > Thanks,
>> > Timur
>> >
>> >
>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:
>> >>
>> >> Yes, for sure.
>> >>
>> >> I added some documentation for AWS here:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>> >>
>> >> Would be nice to update that page with your pull request. :-)
>> >>
>> >> – Ufuk
>> >>
>> >>
>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org>
>> wrote:
>> >> > Hi Timur,
>> >> >
>> >> > Great! Bootstrap action for Flink is good for AWS users. I think the
>> >> > bootstrap action scripts would be placed in `flink-contrib`
>> directory.
>> >> >
>> >> > If you want, one of people in PMC of Flink will be assign FLINK-1337
>> to
>> >> > you.
>> >> >
>> >> > Regards,
>> >> > Chiwan Park
>> >> >
>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <
>> timur.fairuzov@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> I had a guide like that.
>> >> >>
>> >> >
>> >
>> >
>>
>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
The exception does not show up in the console when I run the job, it only
shows in the logs. I thought it means that it happens either on AM or TM (I
assume what I see in stdout is client log). Is my thinking right?


On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Timur,
>
> Just had a chat with Robert about this. I agree that the error message
> is confusing, but it is fine it this case. The file system classes are
> not on the class path of the client process, which is submitting the
> job.

Do you mean that classes should be in the classpath of
`org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
tried to add EMRFS jars to this classpath but it did not help. BTW, it
seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
point that I control here to add to classpath, so I had to set it manually.


> It fails to sample the input file sizes, but this is just an
> optimization step and hence it does not fail the job and only logs the
> error.
>
Is this optimization only for client side? In other words, does it affect
Flink's ability to choose proper type of a join?


>
> After the job is submitted everything should run as expected.
>
> You should be able to get rid of that exception by adding the missing
> classes to the class path of the client process (started via
> bin/flink), for example via the lib folder.
>
The above approach did not work, could you elaborate what you meant by 'lib
folder'?

Thanks,
Timur


> – Ufuk
>
>
>
>
> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <ti...@gmail.com>
> wrote:
> > There's one more filesystem integration failure that I have found. My
> job on
> > a toy dataset succeeds, but Flink log contains the following message:
> > 2016-04-07 18:10:01,339 ERROR
> > org.apache.flink.api.common.io.DelimitedInputFormat           -
> Unexpected
> > problen while getting the file statistics for file 's3://...':
> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > java.lang.RuntimeException: java.lang.RuntimeException:
> > java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> >         at
> >
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> >         at
> >
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> >         at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
> >         at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
> >         at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
> >         at
> >
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
> >         at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
> >         at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
> >         at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
> >         at
> >
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
> >         at
> > org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
> >         at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
> >         at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> >         at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
> >         at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
> >         at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
> >         at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >         at
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >         at
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> >         at scala.Option.foreach(Option.scala:257)
> >         at
> > com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
> >         at
> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:606)
> >         at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >         at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >         at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >         at
> >
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >         at
> >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >         at
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> > Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
> >         ... 37 more
> > Caused by: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> >
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
> >         ... 38 more
> >
> > I assume this may be a big problem if run on large datasets as there
> will be
> > no information for optimizer. I tried to change EMRFS to NativeS3 driver,
> > but get the same error, which is surprising. I expected
> NativeS3FileSystem
> > to be in the classpath since it ships with Flink runtime.
> >
> > Thanks,
> > Timur
> >
> >
> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:
> >>
> >> Yes, for sure.
> >>
> >> I added some documentation for AWS here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
> >>
> >> Would be nice to update that page with your pull request. :-)
> >>
> >> – Ufuk
> >>
> >>
> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org>
> wrote:
> >> > Hi Timur,
> >> >
> >> > Great! Bootstrap action for Flink is good for AWS users. I think the
> >> > bootstrap action scripts would be placed in `flink-contrib` directory.
> >> >
> >> > If you want, one of people in PMC of Flink will be assign FLINK-1337
> to
> >> > you.
> >> >
> >> > Regards,
> >> > Chiwan Park
> >> >
> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairuzov@gmail.com
> >
> >> >> wrote:
> >> >>
> >> >> I had a guide like that.
> >> >>
> >> >
> >
> >
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Timur,

Just had a chat with Robert about this. I agree that the error message
is confusing, but it is fine it this case. The file system classes are
not on the class path of the client process, which is submitting the
job. It fails to sample the input file sizes, but this is just an
optimization step and hence it does not fail the job and only logs the
error.

After the job is submitted everything should run as expected.

You should be able to get rid of that exception by adding the missing
classes to the class path of the client process (started via
bin/flink), for example via the lib folder.

– Ufuk




On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <ti...@gmail.com> wrote:
> There's one more filesystem integration failure that I have found. My job on
> a toy dataset succeeds, but Flink log contains the following message:
> 2016-04-07 18:10:01,339 ERROR
> org.apache.flink.api.common.io.DelimitedInputFormat           - Unexpected
> problen while getting the file statistics for file 's3://...':
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>         at
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>         at
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>         at
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>         at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>         at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>         at
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>         at
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>         at
> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>         at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>         at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>         at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>         at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>         at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>         at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>         at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>         at scala.Option.foreach(Option.scala:257)
>         at
> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>         at
> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>         ... 37 more
> Caused by: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>         ... 38 more
>
> I assume this may be a big problem if run on large datasets as there will be
> no information for optimizer. I tried to change EMRFS to NativeS3 driver,
> but get the same error, which is surprising. I expected NativeS3FileSystem
> to be in the classpath since it ships with Flink runtime.
>
> Thanks,
> Timur
>
>
> On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>> Yes, for sure.
>>
>> I added some documentation for AWS here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>>
>> Would be nice to update that page with your pull request. :-)
>>
>> – Ufuk
>>
>>
>> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org> wrote:
>> > Hi Timur,
>> >
>> > Great! Bootstrap action for Flink is good for AWS users. I think the
>> > bootstrap action scripts would be placed in `flink-contrib` directory.
>> >
>> > If you want, one of people in PMC of Flink will be assign FLINK-1337 to
>> > you.
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <ti...@gmail.com>
>> >> wrote:
>> >>
>> >> I had a guide like that.
>> >>
>> >
>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Stephan Ewen <se...@apache.org>.
@vinay patil - Can you see if the same problem occurs if you use Flink 1.1
- to see if this is a regression in Flink 1.2?



On Tue, Mar 7, 2017 at 6:43 PM, Shannon Carey <sc...@expedia.com> wrote:

> Generally, using S3 filesystem in EMR with Flink has worked pretty well
> for me in Flink < 1.2 (unless you run out of connections in your HTTP
> pool). When you say, "using Hadoop File System class", what do you mean?
> In my experience, it's sufficient to just use the "s3://" filesystem
> protocol and Flink's Hadoop integration (plus S3 filesystem classes
> provided by EMR) will do the right thing.
>
> -Shannon
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Shannon Carey <sc...@expedia.com>.
Generally, using S3 filesystem in EMR with Flink has worked pretty well for me in Flink < 1.2 (unless you run out of connections in your HTTP pool). When you say, "using Hadoop File System class", what do you mean? In my experience, it's sufficient to just use the "s3://" filesystem protocol and Flink's Hadoop integration (plus S3 filesystem classes provided by EMR) will do the right thing.

-Shannon

Re: Integrate Flink with S3 on EMR cluster

Posted by vinay patil <vi...@gmail.com>.
Hi Guys,

Has anyone got this error before ? If yes, have you found any other
solution apart from copying the jar files to flink lib folder

Regards,
Vinay Patil

On Mon, Mar 6, 2017 at 8:21 PM, vinay patil [via Apache Flink User Mailing
List archive.] <ml...@n4.nabble.com> wrote:

> Hi Guys,
>
> I am getting the same exception:
> EMRFileSystem not Found
>
> I am trying to read encrypted S3 file using Hadoop File System class.
>  (using Flink 1.2.0)
> When I copy all the libs from /usr/share/aws/emrfs/lib and /usr/lib/hadoop
> to Flink lib folder , it works.
>
> However I see that all these libs are already included in the Hadoop
> classpath.
>
> Is there any other way I can make this work ?
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12053.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12072.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Integrate Flink with S3 on EMR cluster

Posted by vinay patil <vi...@gmail.com>.
Hi Guys,

I am getting the same exception:
EMRFileSystem not Found

I am trying to read encrypted S3 file using Hadoop File System class. 
(using Flink 1.2.0)
When I copy all the libs from /usr/share/aws/emrfs/lib and /usr/lib/hadoop
to Flink lib folder , it works.

However I see that all these libs are already included in the Hadoop
classpath.

Is there any other way I can make this work ?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12053.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
There's one more filesystem integration failure that I have found. My job
on a toy dataset succeeds, but Flink log contains the following message:
2016-04-07 18:10:01,339 ERROR
org.apache.flink.api.common.io.DelimitedInputFormat           - Unexpected
problen while getting the file statistics for file 's3://...':
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
        at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
        at
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
        at
org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
        at
org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
        at
org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
        at
org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
        at
org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
        at
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
        at
org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
        at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
        at
org.apache.flink.client.program.Client.runBlocking(Client.java:314)
        at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
        at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
        at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
        at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
        at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
        at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
        at scala.Option.foreach(Option.scala:257)
        at
com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
        at
com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at
org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
        ... 37 more
Caused by: java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
        at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
        ... 38 more

I assume this may be a big problem if run on large datasets as there will
be no information for optimizer. I tried to change EMRFS to NativeS3
driver, but get the same error, which is surprising. I expected
NativeS3FileSystem to be in the classpath since it ships with Flink runtime.

Thanks,
Timur


On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Yes, for sure.
>
> I added some documentation for AWS here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>
> Would be nice to update that page with your pull request. :-)
>
> – Ufuk
>
>
> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org> wrote:
> > Hi Timur,
> >
> > Great! Bootstrap action for Flink is good for AWS users. I think the
> bootstrap action scripts would be placed in `flink-contrib` directory.
> >
> > If you want, one of people in PMC of Flink will be assign FLINK-1337 to
> you.
> >
> > Regards,
> > Chiwan Park
> >
> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <ti...@gmail.com>
> wrote:
> >>
> >> I had a guide like that.
> >>
> >
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Ufuk Celebi <uc...@apache.org>.
Yes, for sure.

I added some documentation for AWS here:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html

Would be nice to update that page with your pull request. :-)

– Ufuk


On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <ch...@apache.org> wrote:
> Hi Timur,
>
> Great! Bootstrap action for Flink is good for AWS users. I think the bootstrap action scripts would be placed in `flink-contrib` directory.
>
> If you want, one of people in PMC of Flink will be assign FLINK-1337 to you.
>
> Regards,
> Chiwan Park
>
>> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <ti...@gmail.com> wrote:
>>
>> I had a guide like that.
>>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Chiwan Park <ch...@apache.org>.
Hi Timur,

Great! Bootstrap action for Flink is good for AWS users. I think the bootstrap action scripts would be placed in `flink-contrib` directory.

If you want, one of people in PMC of Flink will be assign FLINK-1337 to you.

Regards,
Chiwan Park

> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <ti...@gmail.com> wrote:
> 
> I had a guide like that.
> 


Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
Yes, Hadoop version was the culprit. It turns out that EMRFS requires at
least 2.4.0 (judging from the exception in the initial post, I was not able
to find the official requirements).

Rebuilding Flink with Hadoop 2.7.1 and with Scala 2.11 worked like a charm
and I was able to run WordCount using S3 both for inputs and outputs. I did
*not* need to change any configuration (as outlined
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html
).

Thanks for bearing with me, Ufuk!

While looking for the solution I found this issue:
https://issues.apache.org/jira/browse/FLINK-1337. I have a setup for EMR
cluster now, so I can make  PR describing it. if it's still relevant. I,
for one example, would have saved couple days if I had a guide like that.

Thanks,
Timur


On Tue, Apr 5, 2016 at 10:43 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Timur,
>
> if you are using EMR with IAM roles, Flink should work out of the box.
> You don't need to change the Hadoop config and the IAM role takes care
> of setting up all credentials at runtime. You don't need to hardcode
> any keys in your application that way and this is the recommended way
> to go in order to not worry about securely exchanging the keys and
> then keeping them secure afterwards.
>
> With EMR 4.4.0 you have to use a Flink binary version built against
> Hadoop 2.7. Did you do that? Can you please retry with an
> out-of-the-box Flink and just run it like this:
>
> HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.
>
> Hope this helps! Please report back. :-)
>
> – Ufuk
>
>
> On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <ti...@gmail.com>
> wrote:
> > Hello Ufuk,
> >
> > I'm using EMR 4.4.0.
> >
> > Thanks,
> > Timur
> >
> > On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <uc...@apache.org> wrote:
> >>
> >> Hey Timur,
> >>
> >> which EMR version are you using?
> >>
> >> – Ufuk
> >>
> >> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <
> timur.fairuzov@gmail.com>
> >> wrote:
> >> > Thanks for the answer, Ken.
> >> >
> >> > My understanding is that file system selection is driven by the
> >> > following
> >> > sections in core-site.xml:
> >> > <property>
> >> >   <name>fs.s3.impl</name>
> >> >   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
> >> > This
> >> > was the original value -->
> >> >   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
> >> > </property>
> >> >
> >> > <property>
> >> >   <name>fs.s3n.impl</name>
> >> >   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
> >> > </property>
> >> >
> >> > If I run the program using configuration above with s3n (and also
> >> > modifying
> >> > credential keys to use s3n) it fails with the same error, but there is
> >> > no
> >> > "... opening key ..." logs. S3a seems to be not supported, it fails
> with
> >> > the
> >> > following:
> >> > Caused by: java.io.IOException: No file system found with scheme s3a,
> >> > referenced in file URI 's3a://<my key>'.
> >> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> >> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> >> > ... 23 more
> >> >
> >> > I am puzzled by the fact that EMRFS is still apparently referenced
> >> > somewhere
> >> > as an implementation for S3 protocol, I'm not able to locate where
> this
> >> > configuration is set.
> >> >
> >> >
> >> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
> >> > <kk...@transpac.com>
> >> > wrote:
> >> >>
> >> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> >> >> s3.
> >> >>
> >> >> Though EMR has some support for magically treating the s3 protocol as
> >> >> s3n
> >> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >> >>
> >> >> What happens if you use s3n://<key info>/<path to file> for the
> --input
> >> >> parameter?
> >> >>
> >> >> — Ken
> >> >>
> >> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> Hello,
> >> >>
> >> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
> >> >> succeeded
> >> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
> >> >> Flink
> >> >> Job, unload outputs from HDFS to S3.
> >> >>
> >> >> However, ideally I'd like to read/write data directly from/to S3. I
> >> >> followed the instructions here:
> >> >>
> >> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
> ,
> >> >> more specifically I:
> >> >>   1. Modified flink-conf to point to /etc/hadoop/conf
> >> >>   2. Modified core-site.xml per link above (not clear why why it is
> not
> >> >> using IAM, I had to provide AWS keys explicitly).
> >> >>
> >> >> Run the following command:
> >> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
> >> >> yarn-cluster
> >> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
> >> >> --input
> >> >> s3://<my key> --output hdfs:///flink-output
> >> >>
> >> >> First, I see messages like that:
> >> >> 2016-04-04 21:37:10,418 INFO
> >> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem              -
> Opening
> >> >> key
> >> >> '<my key>' for reading at position '333000'
> >> >>
> >> >> Then, it fails with the following error:
> >> >>
> >> >> ------------------------------------------------------------
> >> >>
> >> >>  The program finished with the following exception:
> >> >>
> >> >>
> >> >> org.apache.flink.client.program.ProgramInvocationException: The
> program
> >> >> execution failed: Failed to submit job
> fc13373d993539e647f164e12d82bf90
> >> >> (WordCount Example)
> >> >>
> >> >> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >> >>
> >> >> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >> >>
> >> >> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> >> >>
> >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >>
> >> >> at
> >> >>
> >> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> >>
> >> >> at
> >> >>
> >> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >>
> >> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >> >>
> >> >> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >> >>
> >> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >> >>
> >> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >> >>
> >> >> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> >> >> Failed
> >> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
> >> >>
> >> >> at
> >> >>
> >> >> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
> >> >>
> >> >> at
> >> >>
> >> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
> >> >>
> >> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> >> >>
> >> >> at
> >> >>
> >> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> >> >>
> >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> >> >>
> >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
> >> >>
> >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >> >>
> >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >> >>
> >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >> >>
> >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >> >>
> >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >> >>
> >> >> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >>
> >> >> at
> >> >>
> >> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >>
> >> >> at
> >> >>
> >> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >>
> >> >> at
> >> >>
> >> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >>
> >> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
> >> >> splits caused an error:
> >> >>
> >> >>
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
> >> >>
> >> >> at
> >> >>
> >> >> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
> >> >>
> >> >> ... 21 more
> >> >>
> >> >> Caused by: java.lang.NoSuchMethodError:
> >> >>
> >> >>
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> >> >>
> >> >> at
> >> >>
> >> >>
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
> >> >>
> >> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
> >> >>
> >> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> >> >>
> >> >> at
> >> >>
> >> >>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> >> >>
> >> >> ... 23 more
> >> >>
> >> >>
> >> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
> >> >> but
> >> >> it is failing to initialize. I don't know enough about EMRFS/S3
> interop
> >> >> so
> >> >> don't know how diagnose it further.
> >> >>
> >> >> I run Flink 1.0.0 compiled for Scala 2.11.
> >> >>
> >> >> Any advice on how to make it work is highly appreciated.
> >> >>
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Timur
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --------------------------
> >> >> Ken Krugler
> >> >> +1 530-210-6378
> >> >> http://www.scaleunlimited.com
> >> >> custom big data solutions & training
> >> >> Hadoop, Cascading, Cassandra & Solr
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >
> >
> >
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Timur,

if you are using EMR with IAM roles, Flink should work out of the box.
You don't need to change the Hadoop config and the IAM role takes care
of setting up all credentials at runtime. You don't need to hardcode
any keys in your application that way and this is the recommended way
to go in order to not worry about securely exchanging the keys and
then keeping them secure afterwards.

With EMR 4.4.0 you have to use a Flink binary version built against
Hadoop 2.7. Did you do that? Can you please retry with an
out-of-the-box Flink and just run it like this:

HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.

Hope this helps! Please report back. :-)

– Ufuk


On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <ti...@gmail.com> wrote:
> Hello Ufuk,
>
> I'm using EMR 4.4.0.
>
> Thanks,
> Timur
>
> On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>> Hey Timur,
>>
>> which EMR version are you using?
>>
>> – Ufuk
>>
>> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <ti...@gmail.com>
>> wrote:
>> > Thanks for the answer, Ken.
>> >
>> > My understanding is that file system selection is driven by the
>> > following
>> > sections in core-site.xml:
>> > <property>
>> >   <name>fs.s3.impl</name>
>> >   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
>> > This
>> > was the original value -->
>> >   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
>> > </property>
>> >
>> > <property>
>> >   <name>fs.s3n.impl</name>
>> >   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
>> > </property>
>> >
>> > If I run the program using configuration above with s3n (and also
>> > modifying
>> > credential keys to use s3n) it fails with the same error, but there is
>> > no
>> > "... opening key ..." logs. S3a seems to be not supported, it fails with
>> > the
>> > following:
>> > Caused by: java.io.IOException: No file system found with scheme s3a,
>> > referenced in file URI 's3a://<my key>'.
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> > at
>> >
>> > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> > ... 23 more
>> >
>> > I am puzzled by the fact that EMRFS is still apparently referenced
>> > somewhere
>> > as an implementation for S3 protocol, I'm not able to locate where this
>> > configuration is set.
>> >
>> >
>> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
>> > <kk...@transpac.com>
>> > wrote:
>> >>
>> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
>> >> s3.
>> >>
>> >> Though EMR has some support for magically treating the s3 protocol as
>> >> s3n
>> >> (or maybe s3a now, with Hadoop 2.6 or later)
>> >>
>> >> What happens if you use s3n://<key info>/<path to file> for the --input
>> >> parameter?
>> >>
>> >> — Ken
>> >>
>> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com>
>> >> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
>> >> succeeded
>> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
>> >> Flink
>> >> Job, unload outputs from HDFS to S3.
>> >>
>> >> However, ideally I'd like to read/write data directly from/to S3. I
>> >> followed the instructions here:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> >> more specifically I:
>> >>   1. Modified flink-conf to point to /etc/hadoop/conf
>> >>   2. Modified core-site.xml per link above (not clear why why it is not
>> >> using IAM, I had to provide AWS keys explicitly).
>> >>
>> >> Run the following command:
>> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
>> >> yarn-cluster
>> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
>> >> --input
>> >> s3://<my key> --output hdfs:///flink-output
>> >>
>> >> First, I see messages like that:
>> >> 2016-04-04 21:37:10,418 INFO
>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening
>> >> key
>> >> '<my key>' for reading at position '333000'
>> >>
>> >> Then, it fails with the following error:
>> >>
>> >> ------------------------------------------------------------
>> >>
>> >>  The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The program
>> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> >> (WordCount Example)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>> >>
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>
>> >> at java.lang.reflect.Method.invoke(Method.java:606)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>
>> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>
>> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>
>> >> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> >> Failed
>> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>> >>
>> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >>
>> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >>
>> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>> >>
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >>
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >>
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >>
>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >>
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> >> splits caused an error:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>> >>
>> >> ... 21 more
>> >>
>> >> Caused by: java.lang.NoSuchMethodError:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>> >>
>> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>> >>
>> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> >>
>> >> ... 23 more
>> >>
>> >>
>> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
>> >> but
>> >> it is failing to initialize. I don't know enough about EMRFS/S3 interop
>> >> so
>> >> don't know how diagnose it further.
>> >>
>> >> I run Flink 1.0.0 compiled for Scala 2.11.
>> >>
>> >> Any advice on how to make it work is highly appreciated.
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Timur
>> >>
>> >>
>> >>
>> >>
>> >> --------------------------
>> >> Ken Krugler
>> >> +1 530-210-6378
>> >> http://www.scaleunlimited.com
>> >> custom big data solutions & training
>> >> Hadoop, Cascading, Cassandra & Solr
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
Hello Ufuk,

I'm using EMR 4.4.0.

Thanks,
Timur

On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Timur,
>
> which EMR version are you using?
>
> – Ufuk
>
> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <ti...@gmail.com>
> wrote:
> > Thanks for the answer, Ken.
> >
> > My understanding is that file system selection is driven by the following
> > sections in core-site.xml:
> > <property>
> >   <name>fs.s3.impl</name>
> >   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
> This
> > was the original value -->
> >   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
> > </property>
> >
> > <property>
> >   <name>fs.s3n.impl</name>
> >   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
> > </property>
> >
> > If I run the program using configuration above with s3n (and also
> modifying
> > credential keys to use s3n) it fails with the same error, but there is no
> > "... opening key ..." logs. S3a seems to be not supported, it fails with
> the
> > following:
> > Caused by: java.io.IOException: No file system found with scheme s3a,
> > referenced in file URI 's3a://<my key>'.
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> > at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> > ... 23 more
> >
> > I am puzzled by the fact that EMRFS is still apparently referenced
> somewhere
> > as an implementation for S3 protocol, I'm not able to locate where this
> > configuration is set.
> >
> >
> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_lists@transpac.com
> >
> > wrote:
> >>
> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> s3.
> >>
> >> Though EMR has some support for magically treating the s3 protocol as
> s3n
> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >>
> >> What happens if you use s3n://<key info>/<path to file> for the --input
> >> parameter?
> >>
> >> — Ken
> >>
> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com>
> >> wrote:
> >>
> >> Hello,
> >>
> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
> succeeded
> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
> Flink
> >> Job, unload outputs from HDFS to S3.
> >>
> >> However, ideally I'd like to read/write data directly from/to S3. I
> >> followed the instructions here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
> ,
> >> more specifically I:
> >>   1. Modified flink-conf to point to /etc/hadoop/conf
> >>   2. Modified core-site.xml per link above (not clear why why it is not
> >> using IAM, I had to provide AWS keys explicitly).
> >>
> >> Run the following command:
> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
> yarn-cluster
> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> >> s3://<my key> --output hdfs:///flink-output
> >>
> >> First, I see messages like that:
> >> 2016-04-04 21:37:10,418 INFO
> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening
> key
> >> '<my key>' for reading at position '333000'
> >>
> >> Then, it fails with the following error:
> >>
> >> ------------------------------------------------------------
> >>
> >>  The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The program
> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> >> (WordCount Example)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >>
> >> at
> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >>
> >> at
> >>
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> >>
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>
> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>
> >> at
> >>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>
> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>
> >> at
> >>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>
> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>
> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
> >>
> >> at
> >> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
> >>
> >> at
> >>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
> >>
> >> at
> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >>
> >> at
> >>
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
> >>
> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> >>
> >> at
> >>
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> >>
> >> at
> >>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> >>
> >> at
> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> >>
> >> at
> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> >>
> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >>
> >> at
> >>
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> >>
> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >>
> >> at
> >>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
> >>
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >>
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >>
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >>
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >>
> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >>
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>
> >> at
> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>
> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
> >> splits caused an error:
> >>
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> >>
> >> at
> >>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
> >>
> >> at
> >>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
> >>
> >> at
> >> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
> >>
> >> ... 21 more
> >>
> >> Caused by: java.lang.NoSuchMethodError:
> >>
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> >>
> >> at
> >>
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
> >>
> >> at
> >>
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
> >>
> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
> >>
> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> >>
> >> at
> >>
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> >>
> >> at
> >>
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> >>
> >> at
> >>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> >>
> >> ... 23 more
> >>
> >>
> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
> but
> >> it is failing to initialize. I don't know enough about EMRFS/S3 interop
> so
> >> don't know how diagnose it further.
> >>
> >> I run Flink 1.0.0 compiled for Scala 2.11.
> >>
> >> Any advice on how to make it work is highly appreciated.
> >>
> >>
> >> Thanks,
> >>
> >> Timur
> >>
> >>
> >>
> >>
> >> --------------------------
> >> Ken Krugler
> >> +1 530-210-6378
> >> http://www.scaleunlimited.com
> >> custom big data solutions & training
> >> Hadoop, Cascading, Cassandra & Solr
> >>
> >>
> >>
> >>
> >>
> >
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Timur,

which EMR version are you using?

– Ufuk

On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <ti...@gmail.com> wrote:
> Thanks for the answer, Ken.
>
> My understanding is that file system selection is driven by the following
> sections in core-site.xml:
> <property>
>   <name>fs.s3.impl</name>
>   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!-- This
> was the original value -->
>   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
> </property>
>
> <property>
>   <name>fs.s3n.impl</name>
>   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
> </property>
>
> If I run the program using configuration above with s3n (and also modifying
> credential keys to use s3n) it fails with the same error, but there is no
> "... opening key ..." logs. S3a seems to be not supported, it fails with the
> following:
> Caused by: java.io.IOException: No file system found with scheme s3a,
> referenced in file URI 's3a://<my key>'.
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> ... 23 more
>
> I am puzzled by the fact that EMRFS is still apparently referenced somewhere
> as an implementation for S3 protocol, I'm not able to locate where this
> configuration is set.
>
>
> On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kk...@transpac.com>
> wrote:
>>
>> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>>
>> Though EMR has some support for magically treating the s3 protocol as s3n
>> (or maybe s3a now, with Hadoop 2.6 or later)
>>
>> What happens if you use s3n://<key info>/<path to file> for the --input
>> parameter?
>>
>> — Ken
>>
>> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com>
>> wrote:
>>
>> Hello,
>>
>> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
>> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
>> Job, unload outputs from HDFS to S3.
>>
>> However, ideally I'd like to read/write data directly from/to S3. I
>> followed the instructions here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> more specifically I:
>>   1. Modified flink-conf to point to /etc/hadoop/conf
>>   2. Modified core-site.xml per link above (not clear why why it is not
>> using IAM, I had to provide AWS keys explicitly).
>>
>> Run the following command:
>> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
>> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
>> s3://<my key> --output hdfs:///flink-output
>>
>> First, I see messages like that:
>> 2016-04-04 21:37:10,418 INFO
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key
>> '<my key>' for reading at position '333000'
>>
>> Then, it fails with the following error:
>>
>> ------------------------------------------------------------
>>
>>  The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> (WordCount Example)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:606)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
>> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>>
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>>
>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>>
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> splits caused an error:
>> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>>
>> ... 21 more
>>
>> Caused by: java.lang.NoSuchMethodError:
>> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>>
>> at
>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>>
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>>
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>>
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>>
>> at
>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>>
>> at
>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>>
>> ... 23 more
>>
>>
>> Somehow, it's still tries to use EMRFS (which may be a valid thing?), but
>> it is failing to initialize. I don't know enough about EMRFS/S3 interop so
>> don't know how diagnose it further.
>>
>> I run Flink 1.0.0 compiled for Scala 2.11.
>>
>> Any advice on how to make it work is highly appreciated.
>>
>>
>> Thanks,
>>
>> Timur
>>
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
>>
>>
>>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov <ti...@gmail.com>.
Thanks for the answer, Ken.

My understanding is that file system selection is driven by the following
sections in core-site.xml:
<property>
  <name>fs.s3.impl</name>
  <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!-- This
was the original value -->
  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

<property>
  <name>fs.s3n.impl</name>
  <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
</property>

If I run the program using configuration above with s3n (and also modifying
credential keys to use s3n) it fails with the same error, but there is no
"... opening key ..." logs. S3a seems to be not supported, it fails with
the following:
Caused by: java.io.IOException: No file system found with scheme s3a,
referenced in file URI 's3a://<my key>'.
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
... 23 more

I am puzzled by the fact that EMRFS is still apparently referenced
somewhere as an implementation for S3 protocol, I'm not able to locate
where this configuration is set.


On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kk...@transpac.com>
wrote:

> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>
> Though EMR has some support for magically treating the s3 protocol as s3n
> (or maybe s3a now, with Hadoop 2.6 or later)
>
> What happens if you use s3n://<key info>/<path to file> for the --input
> parameter?
>
> — Ken
>
> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com>
> wrote:
>
> Hello,
>
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
> Job, unload outputs from HDFS to S3.
>
> However, ideally I'd like to read/write data directly from/to S3. I
> followed the instructions here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
> more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not
> using IAM, I had to provide AWS keys explicitly).
>
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> s3://<my key> --output hdfs:///flink-output
>
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO
> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key
> '<my key>' for reading at position '333000'
>
> Then, it fails with the following error:
>
> ------------------------------------------------------------
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> (WordCount Example)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error:
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>
> ... 21 more
>
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>
> at
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>
> ... 23 more
>
>
> Somehow, it's still tries to use EMRFS (which may be a valid thing?), but
> it is failing to initialize. I don't know enough about EMRFS/S3 interop so
> don't know how diagnose it further.
>
> I run Flink 1.0.0 compiled for Scala 2.11.
>
> Any advice on how to make it work is highly appreciated.
>
>
> Thanks,
>
> Timur
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
>

Re: Integrate Flink with S3 on EMR cluster

Posted by Ken Krugler <kk...@transpac.com>.
Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.

Though EMR has some support for magically treating the s3 protocol as s3n (or maybe s3a now, with Hadoop 2.6 or later)

What happens if you use s3n://<key info>/<path to file> for the --input parameter?

— Ken

> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <ti...@gmail.com> wrote:
> 
> Hello,
> 
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded with a three-step procedure: load data from S3 to cluster's HDFS, run Flink Job, unload outputs from HDFS to S3.
> 
> However, ideally I'd like to read/write data directly from/to S3. I followed the instructions here: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html>, more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not using IAM, I had to provide AWS keys explicitly).
> 
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input s3://<my key> --output hdfs:///flink-output  
> 
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO  org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key '<my key>' for reading at position '333000'
>  
> Then, it fails with the following error:
> ------------------------------------------------------------
> 
>  The program finished with the following exception:
> 
> 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> 
> at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> 
> at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> at java.lang.reflect.Method.invoke(Method.java:606)
> 
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> 
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> 
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> 
> at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> 
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> 
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> 
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> 
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
> 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
> 
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
> 
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> 
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> 
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
> 
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> 
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
> 
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
> 
> ... 21 more
> 
> Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
> 
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
> 
> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
> 
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
> 
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> 
> at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> 
> at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> 
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
> 
> ... 23 more 
> 
> 
> 
> Somehow, it's still tries to use EMRFS (which may be a valid thing?), but it is failing to initialize. I don't know enough about EMRFS/S3 interop so don't know how diagnose it further.
> 
> I run Flink 1.0.0 compiled for Scala 2.11.
> 
> Any advice on how to make it work is highly appreciated.
> 
> 
> 
> Thanks,
> 
> Timur
> 



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr