You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Senthil Kumar <se...@vmware.com> on 2020/01/17 14:57:13 UTC

Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in /usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/



it seems that I need to install the jar file in the plugins directory (/usr/lib/flink/plugins/s3-fs-hadoop)



That did not work either.



At this point, I am not sure what to do and would appreciate some help!



Cheers

Kumar


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Arvid Heise <ar...@ververica.com>.
Sorry I meant to respond to Senthi.
Thank you Aaron for providing help.

Also one more thing that may be confusing the first time you use plugins.
You need to put plugins in their own folders, we improved documentation in
the upcoming 1.10 release:

flink-dist
├── conf
├── lib
...
└── plugins
    └── s3 (name is arbitrary)
        └── flink-s3-fs-hadoop.jar


On Tue, Jan 28, 2020 at 9:18 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Aaron,
>
> I encountered a similar issue when running on EMR. On the slaves, there
> are some lingering hadoop versions that are older than 2.7 (it was 2.6 if I
> remember correctly), which bleed into the classpath of Flink.
> Flink checks the Hadoop version to check if certain capabilities like file
> truncations are available or not. In your case, it loads the version info
> of the old jar first and determines that file truncation is not available.
>
> I'm currently working on a fix [1]. Then, Flink will only check the Hadoop
> version of the Hadoop that is bundled in the plugin and not the one first
> in the classpath.
>
> As a workaround, you could try the following options:
> - Don't use s3 as a plugin but put the jar in lib/ (that's not working
> anymore in Flink 1.10+ though)
> - Or connect to your slaves find the old hadoop-common*.jar and remove it
> manually. The location of all relevant hadoop-common*.jar should be visible
> in the task manager logs. If they have the version number >2.7 they are
> good, all other may result in issues.
>
> Ping me if you need further assistance.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15777
>
> On Fri, Jan 24, 2020 at 5:30 PM Aaron Langford <aa...@gmail.com>
> wrote:
>
>> This seems to confirm that the S3 file system implementation is not being
>> loaded when you start your job.
>>
>> Can you share the details of how you are getting the flink-s3-fs-hadoop artifact
>> onto your cluster? Are you simply ssh-ing to the master node and doing this
>> manually? Are you doing this via a bootstrap action? Timing of this
>> action would be relevant as well.
>>
>> Aaron
>>
>> On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar <se...@vmware.com>
>> wrote:
>>
>>> Thanks, here’s the debug output. It looks like we need to setup
>>> hdfs-config file in the flink config.
>>>
>>> Could you advise us further?
>>>
>>>
>>>
>>> --
>>>
>>>
>>>
>>> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>>>                     - Loading extension file systems via services
>>>
>>> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>>>                     - Added file system
>>> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>>>
>>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>>                   - Cannot find hdfs-default configuration-file path in
>>> Flink config.
>>>
>>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>>                   - Cannot find hdfs-site configuration-file path in
>>> Flink config.
>>>
>>>
>>>
>>>
>>>
>>> *From: *Aaron Langford <aa...@gmail.com>
>>> *Date: *Thursday, January 23, 2020 at 12:22 PM
>>> *To: *Senthil Kumar <se...@vmware.com>
>>> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
>>> user@flink.apache.org>
>>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on
>>> EMR)
>>>
>>>
>>>
>>> When creating your cluster, you can provide configurations that EMR will
>>> find the right home for. Example for the aws cli:
>>>
>>>
>>>
>>> aws emr create-cluster ... --configurations '[{
>>>     "Classification": "flink-log4j",
>>>     "Properties": {
>>>       "log4j.rootLogger": "DEBUG,file"
>>>     }
>>>   },{
>>>     "Classification": "flink-log4j-yarn-session",
>>>     "Properties": {
>>>       "log4j.rootLogger": "DEBUG,stdout"
>>>   }]'
>>>
>>>
>>>
>>> If you can't take down your existing EMR cluster for some reason, you
>>> can ask AWS to modify these configurations for you on the cluster. They
>>> should take effect when you start a new Flink job (new job manager as well
>>> as a new job in that job manager). It is my understanding that
>>> configuration changes require a restart of a flink jobmanager + topology in
>>> order to take effect. Here's an example of how to modify an existing
>>> cluster (I just threw this together, so beware malformed JSON):
>>>
>>>
>>>
>>> aws emr modify-instance-groups --cli-input-json '{
>>>     "ClusterId": "<your cluster id>",
>>>     "InstanceGroups": [{
>>>         "InstanceGroupId": "<master instance group id>",
>>>         "Configurations": [{
>>>             "Classification": "flink-log4j",
>>>             "Properties": {
>>>                 "log4j.rootLogger": "DEBUG,file"
>>>             }
>>>         },{
>>>             "Classification": "flink-log4j-yarn-session",
>>>             "Properties": {
>>>                 "log4j.rootLogger": "DEBUG,stdout"
>>>             }
>>>         }]
>>>     },{
>>>         "InstanceGroupId": "<core instance group id>",
>>>         "Configurations": [{
>>>             "Classification": "flink-log4j",
>>>             "Properties": {
>>>                 "log4j.rootLogger": "DEBUG,file"
>>>             }
>>>         },{
>>>             "Classification": "flink-log4j-yarn-session",
>>>             "Properties": {
>>>                 "log4j.rootLogger": "DEBUG,stdout"
>>>             }
>>>         }]
>>>      }]
>>> }'
>>>
>>>
>>>
>>> On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar <se...@vmware.com>
>>> wrote:
>>>
>>> Could you tell us how to turn on debug level logs?
>>>
>>>
>>>
>>> We attempted this (on driver)
>>>
>>>
>>>
>>> sudo stop hadoop-yarn-resourcemanager
>>>
>>>
>>>
>>> followed the instructions here
>>>
>>>
>>> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
>>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390&sdata=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D&reserved=0>
>>>
>>>
>>>
>>> and
>>>
>>>
>>>
>>> sudo start hadoop-yarn-resourcemanager
>>>
>>>
>>>
>>> but we still don’t see any debug level logs
>>>
>>>
>>>
>>> Any further info is much appreciated!
>>>
>>>
>>>
>>>
>>>
>>> *From: *Aaron Langford <aa...@gmail.com>
>>> *Date: *Tuesday, January 21, 2020 at 10:54 AM
>>> *To: *Senthil Kumar <se...@vmware.com>
>>> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
>>> user@flink.apache.org>
>>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on
>>> EMR)
>>>
>>>
>>>
>>> Senthil,
>>>
>>>
>>>
>>> One of the key steps in debugging this for me was enabling debug level
>>> logs on my cluster, and then looking at the logs in the resource manager.
>>> The failure you are after happens before the exceptions you have reported
>>> here. When your Flink application is starting, it will attempt to load
>>> various file system implementations. You can see which ones it successfully
>>> loaded when you have the debug level of logs configured. You will have to
>>> do some digging, but this is a good place to start. Try to discover if your
>>> application is indeed loading the s3 file system, or if that is not
>>> happening. You should be able to find the file system implementations that
>>> were loaded by searching for the string "Added file system".
>>>
>>>
>>>
>>> Also, do you mind sharing the bootstrap action script that you are using
>>> to get the s3 file system in place?
>>>
>>>
>>>
>>> Aaron
>>>
>>>
>>>
>>> On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>
>>> wrote:
>>>
>>> Yang, I appreciate your help! Please let me know if I can provide with
>>> any other info.
>>>
>>>
>>>
>>> I resubmitted my executable jar file as a step to the flink EMR and
>>> here’s are all the  exceptions. I see two of them.
>>>
>>>
>>>
>>> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>>>
>>>
>>>
>>> 2020-01-21 16:31:37,587 ERROR
>>> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
>>> File Source -> Sink: Unnamed (11/16)): Error during di
>>>
>>> sposal of stream operator.
>>>
>>> java.lang.NullPointerException
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>>
>>>
>>> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
>>> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
>>> Custom File Source -> Sink: Unnamed (8/16)
>>> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>>>
>>> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>>> are only supported for HDFS and for Hadoop version 2.7 or newer
>>>
>>>         at
>>> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>>>
>>>         at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>>>
>>>         at
>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>>
>>>
>>> *From: *Yang Wang <da...@gmail.com>
>>> *Date: *Saturday, January 18, 2020 at 7:58 PM
>>> *To: *Senthil Kumar <se...@vmware.com>
>>> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
>>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on
>>> EMR)
>>>
>>>
>>>
>>> I think this exception is not because the hadoop version isn't high
>>> enough.
>>>
>>> It seems that the "s3" URI scheme could not be recognized by
>>> `S3FileSystemFactory`. So it fallbacks to
>>>
>>> the `HadoopFsFactory`.
>>>
>>>
>>>
>>> Could you share the debug level jobmanager/taskmanger logs so that we
>>> could confirm whether the
>>>
>>> classpath and FileSystem are loaded correctly.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best,
>>>
>>> Yang
>>>
>>>
>>>
>>> Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>>>
>>> Hello all,
>>>
>>>
>>>
>>> Newbie here!
>>>
>>>
>>>
>>> We are running in Amazon EMR with the following installed in the EMR
>>> Software Configuration
>>>
>>> Hadoop 2.8.5
>>>
>>> JupyterHub 1.0.0
>>>
>>> Ganglia 3.7.2
>>>
>>> Hive 2.3.6
>>>
>>> Flink 1.9.0
>>>
>>>
>>>
>>> I am trying to get a Streaming job from one S3 bucket into an another S3
>>> bucket using the StreamingFileSink
>>>
>>>
>>>
>>> I got the infamous exception:
>>>
>>> Caused by: java.lang.UnsupportedOperationException: Recoverable writers
>>> on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>>>
>>>
>>>
>>> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
>>> /usr/lib/flink/lib
>>>
>>>
>>> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
>>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=FUU5VqjDZlt%2FbgFVpZzLzMmMwJ%2B2lIeltIC32QOuUnw%3D&reserved=0>
>>>
>>>
>>>
>>> That did not work.
>>>
>>>
>>>
>>> Further googling, revealed for Flink 1.9.0 and above:  (according to
>>> this)
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=9%2B2kql06euH5FC0rRjJpwnDqQYoiNVyUHXtOW%2FQzm%2FM%3D&reserved=0>
>>>
>>>
>>>
>>> it seems that I need to install the jar file in the plugins directory
>>> (/usr/lib/flink/plugins/s3-fs-hadoop)
>>>
>>>
>>>
>>> That did not work either.
>>>
>>>
>>>
>>> At this point, I am not sure what to do and would appreciate some help!
>>>
>>>
>>>
>>> Cheers
>>>
>>> Kumar
>>>
>>>
>>>
>>>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Arvid Heise <ar...@ververica.com>.
Hi Aaron,

I encountered a similar issue when running on EMR. On the slaves, there are
some lingering hadoop versions that are older than 2.7 (it was 2.6 if I
remember correctly), which bleed into the classpath of Flink.
Flink checks the Hadoop version to check if certain capabilities like file
truncations are available or not. In your case, it loads the version info
of the old jar first and determines that file truncation is not available.

I'm currently working on a fix [1]. Then, Flink will only check the Hadoop
version of the Hadoop that is bundled in the plugin and not the one first
in the classpath.

As a workaround, you could try the following options:
- Don't use s3 as a plugin but put the jar in lib/ (that's not working
anymore in Flink 1.10+ though)
- Or connect to your slaves find the old hadoop-common*.jar and remove it
manually. The location of all relevant hadoop-common*.jar should be visible
in the task manager logs. If they have the version number >2.7 they are
good, all other may result in issues.

Ping me if you need further assistance.

[1] https://issues.apache.org/jira/browse/FLINK-15777

On Fri, Jan 24, 2020 at 5:30 PM Aaron Langford <aa...@gmail.com>
wrote:

> This seems to confirm that the S3 file system implementation is not being
> loaded when you start your job.
>
> Can you share the details of how you are getting the flink-s3-fs-hadoop artifact
> onto your cluster? Are you simply ssh-ing to the master node and doing this
> manually? Are you doing this via a bootstrap action? Timing of this
> action would be relevant as well.
>
> Aaron
>
> On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar <se...@vmware.com>
> wrote:
>
>> Thanks, here’s the debug output. It looks like we need to setup
>> hdfs-config file in the flink config.
>>
>> Could you advise us further?
>>
>>
>>
>> --
>>
>>
>>
>> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>>                     - Loading extension file systems via services
>>
>> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>>                     - Added file system
>> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>>
>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>                   - Cannot find hdfs-default configuration-file path in
>> Flink config.
>>
>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>                   - Cannot find hdfs-site configuration-file path in
>> Flink config.
>>
>>
>>
>>
>>
>> *From: *Aaron Langford <aa...@gmail.com>
>> *Date: *Thursday, January 23, 2020 at 12:22 PM
>> *To: *Senthil Kumar <se...@vmware.com>
>> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
>> user@flink.apache.org>
>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>>
>>
>>
>> When creating your cluster, you can provide configurations that EMR will
>> find the right home for. Example for the aws cli:
>>
>>
>>
>> aws emr create-cluster ... --configurations '[{
>>     "Classification": "flink-log4j",
>>     "Properties": {
>>       "log4j.rootLogger": "DEBUG,file"
>>     }
>>   },{
>>     "Classification": "flink-log4j-yarn-session",
>>     "Properties": {
>>       "log4j.rootLogger": "DEBUG,stdout"
>>   }]'
>>
>>
>>
>> If you can't take down your existing EMR cluster for some reason, you can
>> ask AWS to modify these configurations for you on the cluster. They should
>> take effect when you start a new Flink job (new job manager as well as a
>> new job in that job manager). It is my understanding that configuration
>> changes require a restart of a flink jobmanager + topology in order to take
>> effect. Here's an example of how to modify an existing cluster (I just
>> threw this together, so beware malformed JSON):
>>
>>
>>
>> aws emr modify-instance-groups --cli-input-json '{
>>     "ClusterId": "<your cluster id>",
>>     "InstanceGroups": [{
>>         "InstanceGroupId": "<master instance group id>",
>>         "Configurations": [{
>>             "Classification": "flink-log4j",
>>             "Properties": {
>>                 "log4j.rootLogger": "DEBUG,file"
>>             }
>>         },{
>>             "Classification": "flink-log4j-yarn-session",
>>             "Properties": {
>>                 "log4j.rootLogger": "DEBUG,stdout"
>>             }
>>         }]
>>     },{
>>         "InstanceGroupId": "<core instance group id>",
>>         "Configurations": [{
>>             "Classification": "flink-log4j",
>>             "Properties": {
>>                 "log4j.rootLogger": "DEBUG,file"
>>             }
>>         },{
>>             "Classification": "flink-log4j-yarn-session",
>>             "Properties": {
>>                 "log4j.rootLogger": "DEBUG,stdout"
>>             }
>>         }]
>>      }]
>> }'
>>
>>
>>
>> On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar <se...@vmware.com>
>> wrote:
>>
>> Could you tell us how to turn on debug level logs?
>>
>>
>>
>> We attempted this (on driver)
>>
>>
>>
>> sudo stop hadoop-yarn-resourcemanager
>>
>>
>>
>> followed the instructions here
>>
>>
>> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390&sdata=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D&reserved=0>
>>
>>
>>
>> and
>>
>>
>>
>> sudo start hadoop-yarn-resourcemanager
>>
>>
>>
>> but we still don’t see any debug level logs
>>
>>
>>
>> Any further info is much appreciated!
>>
>>
>>
>>
>>
>> *From: *Aaron Langford <aa...@gmail.com>
>> *Date: *Tuesday, January 21, 2020 at 10:54 AM
>> *To: *Senthil Kumar <se...@vmware.com>
>> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
>> user@flink.apache.org>
>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>>
>>
>>
>> Senthil,
>>
>>
>>
>> One of the key steps in debugging this for me was enabling debug level
>> logs on my cluster, and then looking at the logs in the resource manager.
>> The failure you are after happens before the exceptions you have reported
>> here. When your Flink application is starting, it will attempt to load
>> various file system implementations. You can see which ones it successfully
>> loaded when you have the debug level of logs configured. You will have to
>> do some digging, but this is a good place to start. Try to discover if your
>> application is indeed loading the s3 file system, or if that is not
>> happening. You should be able to find the file system implementations that
>> were loaded by searching for the string "Added file system".
>>
>>
>>
>> Also, do you mind sharing the bootstrap action script that you are using
>> to get the s3 file system in place?
>>
>>
>>
>> Aaron
>>
>>
>>
>> On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>
>> wrote:
>>
>> Yang, I appreciate your help! Please let me know if I can provide with
>> any other info.
>>
>>
>>
>> I resubmitted my executable jar file as a step to the flink EMR and
>> here’s are all the  exceptions. I see two of them.
>>
>>
>>
>> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>>
>>
>>
>> 2020-01-21 16:31:37,587 ERROR
>> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
>> File Source -> Sink: Unnamed (11/16)): Error during di
>>
>> sposal of stream operator.
>>
>> java.lang.NullPointerException
>>
>>         at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
>> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
>> Custom File Source -> Sink: Unnamed (8/16)
>> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>>
>> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>> are only supported for HDFS and for Hadoop version 2.7 or newer
>>
>>         at
>> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>>
>>         at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>>
>>         at
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>>
>>         at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>>
>>         at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>>
>>         at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>>
>>         at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>
>>         at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>> *From: *Yang Wang <da...@gmail.com>
>> *Date: *Saturday, January 18, 2020 at 7:58 PM
>> *To: *Senthil Kumar <se...@vmware.com>
>> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>>
>>
>>
>> I think this exception is not because the hadoop version isn't high
>> enough.
>>
>> It seems that the "s3" URI scheme could not be recognized by
>> `S3FileSystemFactory`. So it fallbacks to
>>
>> the `HadoopFsFactory`.
>>
>>
>>
>> Could you share the debug level jobmanager/taskmanger logs so that we
>> could confirm whether the
>>
>> classpath and FileSystem are loaded correctly.
>>
>>
>>
>>
>>
>>
>>
>> Best,
>>
>> Yang
>>
>>
>>
>> Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>>
>> Hello all,
>>
>>
>>
>> Newbie here!
>>
>>
>>
>> We are running in Amazon EMR with the following installed in the EMR
>> Software Configuration
>>
>> Hadoop 2.8.5
>>
>> JupyterHub 1.0.0
>>
>> Ganglia 3.7.2
>>
>> Hive 2.3.6
>>
>> Flink 1.9.0
>>
>>
>>
>> I am trying to get a Streaming job from one S3 bucket into an another S3
>> bucket using the StreamingFileSink
>>
>>
>>
>> I got the infamous exception:
>>
>> Caused by: java.lang.UnsupportedOperationException: Recoverable writers
>> on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>>
>>
>>
>> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
>> /usr/lib/flink/lib
>>
>>
>> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=FUU5VqjDZlt%2FbgFVpZzLzMmMwJ%2B2lIeltIC32QOuUnw%3D&reserved=0>
>>
>>
>>
>> That did not work.
>>
>>
>>
>> Further googling, revealed for Flink 1.9.0 and above:  (according to this
>> )
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=9%2B2kql06euH5FC0rRjJpwnDqQYoiNVyUHXtOW%2FQzm%2FM%3D&reserved=0>
>>
>>
>>
>> it seems that I need to install the jar file in the plugins directory
>> (/usr/lib/flink/plugins/s3-fs-hadoop)
>>
>>
>>
>> That did not work either.
>>
>>
>>
>> At this point, I am not sure what to do and would appreciate some help!
>>
>>
>>
>> Cheers
>>
>> Kumar
>>
>>
>>
>>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Aaron Langford <aa...@gmail.com>.
This seems to confirm that the S3 file system implementation is not being
loaded when you start your job.

Can you share the details of how you are getting the
flink-s3-fs-hadoop artifact
onto your cluster? Are you simply ssh-ing to the master node and doing this
manually? Are you doing this via a bootstrap action? Timing of this
action would be relevant as well.

Aaron

On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar <se...@vmware.com> wrote:

> Thanks, here’s the debug output. It looks like we need to setup
> hdfs-config file in the flink config.
>
> Could you advise us further?
>
>
>
> --
>
>
>
> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>                   - Loading extension file systems via services
>
> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>                   - Added file system
> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>
> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>                   - Cannot find hdfs-default configuration-file path in
> Flink config.
>
> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>                   - Cannot find hdfs-site configuration-file path in
> Flink config.
>
>
>
>
>
> *From: *Aaron Langford <aa...@gmail.com>
> *Date: *Thursday, January 23, 2020 at 12:22 PM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> When creating your cluster, you can provide configurations that EMR will
> find the right home for. Example for the aws cli:
>
>
>
> aws emr create-cluster ... --configurations '[{
>     "Classification": "flink-log4j",
>     "Properties": {
>       "log4j.rootLogger": "DEBUG,file"
>     }
>   },{
>     "Classification": "flink-log4j-yarn-session",
>     "Properties": {
>       "log4j.rootLogger": "DEBUG,stdout"
>   }]'
>
>
>
> If you can't take down your existing EMR cluster for some reason, you can
> ask AWS to modify these configurations for you on the cluster. They should
> take effect when you start a new Flink job (new job manager as well as a
> new job in that job manager). It is my understanding that configuration
> changes require a restart of a flink jobmanager + topology in order to take
> effect. Here's an example of how to modify an existing cluster (I just
> threw this together, so beware malformed JSON):
>
>
>
> aws emr modify-instance-groups --cli-input-json '{
>     "ClusterId": "<your cluster id>",
>     "InstanceGroups": [{
>         "InstanceGroupId": "<master instance group id>",
>         "Configurations": [{
>             "Classification": "flink-log4j",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,file"
>             }
>         },{
>             "Classification": "flink-log4j-yarn-session",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,stdout"
>             }
>         }]
>     },{
>         "InstanceGroupId": "<core instance group id>",
>         "Configurations": [{
>             "Classification": "flink-log4j",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,file"
>             }
>         },{
>             "Classification": "flink-log4j-yarn-session",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,stdout"
>             }
>         }]
>      }]
> }'
>
>
>
> On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar <se...@vmware.com>
> wrote:
>
> Could you tell us how to turn on debug level logs?
>
>
>
> We attempted this (on driver)
>
>
>
> sudo stop hadoop-yarn-resourcemanager
>
>
>
> followed the instructions here
>
>
> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390&sdata=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D&reserved=0>
>
>
>
> and
>
>
>
> sudo start hadoop-yarn-resourcemanager
>
>
>
> but we still don’t see any debug level logs
>
>
>
> Any further info is much appreciated!
>
>
>
>
>
> *From: *Aaron Langford <aa...@gmail.com>
> *Date: *Tuesday, January 21, 2020 at 10:54 AM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> Senthil,
>
>
>
> One of the key steps in debugging this for me was enabling debug level
> logs on my cluster, and then looking at the logs in the resource manager.
> The failure you are after happens before the exceptions you have reported
> here. When your Flink application is starting, it will attempt to load
> various file system implementations. You can see which ones it successfully
> loaded when you have the debug level of logs configured. You will have to
> do some digging, but this is a good place to start. Try to discover if your
> application is indeed loading the s3 file system, or if that is not
> happening. You should be able to find the file system implementations that
> were loaded by searching for the string "Added file system".
>
>
>
> Also, do you mind sharing the bootstrap action script that you are using
> to get the s3 file system in place?
>
>
>
> Aaron
>
>
>
> On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>
> wrote:
>
> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> *From: *Yang Wang <da...@gmail.com>
> *Date: *Saturday, January 18, 2020 at 7:58 PM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> I think this exception is not because the hadoop version isn't high
> enough.
>
> It seems that the "s3" URI scheme could not be recognized by
> `S3FileSystemFactory`. So it fallbacks to
>
> the `HadoopFsFactory`.
>
>
>
> Could you share the debug level jobmanager/taskmanger logs so that we
> could confirm whether the
>
> classpath and FileSystem are loaded correctly.
>
>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>
> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a Streaming job from one S3 bucket into an another S3
> bucket using the StreamingFileSink
>
>
>
> I got the infamous exception:
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>
>
>
> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
> /usr/lib/flink/lib
>
>
> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=FUU5VqjDZlt%2FbgFVpZzLzMmMwJ%2B2lIeltIC32QOuUnw%3D&reserved=0>
>
>
>
> That did not work.
>
>
>
> Further googling, revealed for Flink 1.9.0 and above:  (according to this)
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=9%2B2kql06euH5FC0rRjJpwnDqQYoiNVyUHXtOW%2FQzm%2FM%3D&reserved=0>
>
>
>
> it seems that I need to install the jar file in the plugins directory
> (/usr/lib/flink/plugins/s3-fs-hadoop)
>
>
>
> That did not work either.
>
>
>
> At this point, I am not sure what to do and would appreciate some help!
>
>
>
> Cheers
>
> Kumar
>
>
>
>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Senthil Kumar <se...@vmware.com>.
Thanks, here’s the debug output. It looks like we need to setup hdfs-config file in the flink config.
Could you advise us further?

--


2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem                           - Loading extension file systems via services

2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem                           - Added file system maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-default configuration-file path in Flink config.

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-site configuration-file path in Flink config.


From: Aaron Langford <aa...@gmail.com>
Date: Thursday, January 23, 2020 at 12:22 PM
To: Senthil Kumar <se...@vmware.com>
Cc: Yang Wang <da...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

When creating your cluster, you can provide configurations that EMR will find the right home for. Example for the aws cli:

aws emr create-cluster ... --configurations '[{
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.rootLogger": "DEBUG,file"
    }
  },{
    "Classification": "flink-log4j-yarn-session",
    "Properties": {
      "log4j.rootLogger": "DEBUG,stdout"
  }]'

If you can't take down your existing EMR cluster for some reason, you can ask AWS to modify these configurations for you on the cluster. They should take effect when you start a new Flink job (new job manager as well as a new job in that job manager). It is my understanding that configuration changes require a restart of a flink jobmanager + topology in order to take effect. Here's an example of how to modify an existing cluster (I just threw this together, so beware malformed JSON):

aws emr modify-instance-groups --cli-input-json '{
    "ClusterId": "<your cluster id>",
    "InstanceGroups": [{
        "InstanceGroupId": "<master instance group id>",
        "Configurations": [{
            "Classification": "flink-log4j",
            "Properties": {
                "log4j.rootLogger": "DEBUG,file"
            }
        },{
            "Classification": "flink-log4j-yarn-session",
            "Properties": {
                "log4j.rootLogger": "DEBUG,stdout"
            }
        }]
    },{
        "InstanceGroupId": "<core instance group id>",
        "Configurations": [{
            "Classification": "flink-log4j",
            "Properties": {
                "log4j.rootLogger": "DEBUG,file"
            }
        },{
            "Classification": "flink-log4j-yarn-session",
            "Properties": {
                "log4j.rootLogger": "DEBUG,stdout"
            }
        }]
     }]
}'

On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar <se...@vmware.com>> wrote:
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390&sdata=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D&reserved=0>

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford <aa...@gmail.com>>
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar <se...@vmware.com>>
Cc: Yang Wang <da...@gmail.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on my cluster, and then looking at the logs in the resource manager. The failure you are after happens before the exceptions you have reported here. When your Flink application is starting, it will attempt to load various file system implementations. You can see which ones it successfully loaded when you have the debug level of logs configured. You will have to do some digging, but this is a good place to start. Try to discover if your application is indeed loading the s3 file system, or if that is not happening. You should be able to find the file system implementations that were loaded by searching for the string "Added file system".

Also, do you mind sharing the bootstrap action script that you are using to get the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop/<STEP-ID>/syslog


2020-01-21 16:31:37,587 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer

        at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)


From: Yang Wang <da...@gmail.com>>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar <se...@vmware.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by `S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar <se...@vmware.com>> 于2020年1月17日周五 下午10:57写道:

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in /usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=FUU5VqjDZlt%2FbgFVpZzLzMmMwJ%2B2lIeltIC32QOuUnw%3D&reserved=0>



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=9%2B2kql06euH5FC0rRjJpwnDqQYoiNVyUHXtOW%2FQzm%2FM%3D&reserved=0>



it seems that I need to install the jar file in the plugins directory (/usr/lib/flink/plugins/s3-fs-hadoop)



That did not work either.



At this point, I am not sure what to do and would appreciate some help!



Cheers

Kumar


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Aaron Langford <aa...@gmail.com>.
When creating your cluster, you can provide configurations that EMR will
find the right home for. Example for the aws cli:

aws emr create-cluster ... --configurations '[{
>     "Classification": "flink-log4j",
>     "Properties": {
>       "log4j.rootLogger": "DEBUG,file"
>     }
>   },{
>     "Classification": "flink-log4j-yarn-session",
>     "Properties": {
>       "log4j.rootLogger": "DEBUG,stdout"
>   }]'


If you can't take down your existing EMR cluster for some reason, you can
ask AWS to modify these configurations for you on the cluster. They should
take effect when you start a new Flink job (new job manager as well as a
new job in that job manager). It is my understanding that configuration
changes require a restart of a flink jobmanager + topology in order to take
effect. Here's an example of how to modify an existing cluster (I just
threw this together, so beware malformed JSON):

aws emr modify-instance-groups --cli-input-json '{
>     "ClusterId": "<your cluster id>",
>     "InstanceGroups": [{
>         "InstanceGroupId": "<master instance group id>",
>         "Configurations": [{
>             "Classification": "flink-log4j",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,file"
>             }
>         },{
>             "Classification": "flink-log4j-yarn-session",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,stdout"
>             }
>         }]
>     },{
>         "InstanceGroupId": "<core instance group id>",
>         "Configurations": [{
>             "Classification": "flink-log4j",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,file"
>             }
>         },{
>             "Classification": "flink-log4j-yarn-session",
>             "Properties": {
>                 "log4j.rootLogger": "DEBUG,stdout"
>             }
>         }]
>      }]
> }'


On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar <se...@vmware.com> wrote:

> Could you tell us how to turn on debug level logs?
>
>
>
> We attempted this (on driver)
>
>
>
> sudo stop hadoop-yarn-resourcemanager
>
>
>
> followed the instructions here
>
>
> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
>
>
>
> and
>
>
>
> sudo start hadoop-yarn-resourcemanager
>
>
>
> but we still don’t see any debug level logs
>
>
>
> Any further info is much appreciated!
>
>
>
>
>
> *From: *Aaron Langford <aa...@gmail.com>
> *Date: *Tuesday, January 21, 2020 at 10:54 AM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *Yang Wang <da...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> Senthil,
>
>
>
> One of the key steps in debugging this for me was enabling debug level
> logs on my cluster, and then looking at the logs in the resource manager.
> The failure you are after happens before the exceptions you have reported
> here. When your Flink application is starting, it will attempt to load
> various file system implementations. You can see which ones it successfully
> loaded when you have the debug level of logs configured. You will have to
> do some digging, but this is a good place to start. Try to discover if your
> application is indeed loading the s3 file system, or if that is not
> happening. You should be able to find the file system implementations that
> were loaded by searching for the string "Added file system".
>
>
>
> Also, do you mind sharing the bootstrap action script that you are using
> to get the s3 file system in place?
>
>
>
> Aaron
>
>
>
> On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>
> wrote:
>
> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> *From: *Yang Wang <da...@gmail.com>
> *Date: *Saturday, January 18, 2020 at 7:58 PM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> I think this exception is not because the hadoop version isn't high
> enough.
>
> It seems that the "s3" URI scheme could not be recognized by
> `S3FileSystemFactory`. So it fallbacks to
>
> the `HadoopFsFactory`.
>
>
>
> Could you share the debug level jobmanager/taskmanger logs so that we
> could confirm whether the
>
> classpath and FileSystem are loaded correctly.
>
>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>
> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a Streaming job from one S3 bucket into an another S3
> bucket using the StreamingFileSink
>
>
>
> I got the infamous exception:
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>
>
>
> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
> /usr/lib/flink/lib
>
>
> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cabaac5f372264cf958d508d79e9af63a%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637152260726359794&sdata=JVyu9uX4wJDY9COlE%2Bqk3ZIJ0qy6HPxCr68YfMMKglU%3D&reserved=0>
>
>
>
> That did not work.
>
>
>
> Further googling, revealed for Flink 1.9.0 and above:  (according to this)
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cabaac5f372264cf958d508d79e9af63a%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637152260726359794&sdata=2KkpnnV8LpLXSflyaoP7AbL%2BE8gCNvjIxtI1BoI7d%2Fk%3D&reserved=0>
>
>
>
> it seems that I need to install the jar file in the plugins directory
> (/usr/lib/flink/plugins/s3-fs-hadoop)
>
>
>
> That did not work either.
>
>
>
> At this point, I am not sure what to do and would appreciate some help!
>
>
>
> Cheers
>
> Kumar
>
>
>
>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Senthil Kumar <se...@vmware.com>.
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford <aa...@gmail.com>
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar <se...@vmware.com>
Cc: Yang Wang <da...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on my cluster, and then looking at the logs in the resource manager. The failure you are after happens before the exceptions you have reported here. When your Flink application is starting, it will attempt to load various file system implementations. You can see which ones it successfully loaded when you have the debug level of logs configured. You will have to do some digging, but this is a good place to start. Try to discover if your application is indeed loading the s3 file system, or if that is not happening. You should be able to find the file system implementations that were loaded by searching for the string "Added file system".

Also, do you mind sharing the bootstrap action script that you are using to get the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop/<STEP-ID>/syslog


2020-01-21 16:31:37,587 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer

        at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)


From: Yang Wang <da...@gmail.com>>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar <se...@vmware.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by `S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar <se...@vmware.com>> 于2020年1月17日周五 下午10:57写道:

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in /usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cabaac5f372264cf958d508d79e9af63a%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637152260726359794&sdata=JVyu9uX4wJDY9COlE%2Bqk3ZIJ0qy6HPxCr68YfMMKglU%3D&reserved=0>



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cabaac5f372264cf958d508d79e9af63a%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637152260726359794&sdata=2KkpnnV8LpLXSflyaoP7AbL%2BE8gCNvjIxtI1BoI7d%2Fk%3D&reserved=0>



it seems that I need to install the jar file in the plugins directory (/usr/lib/flink/plugins/s3-fs-hadoop)



That did not work either.



At this point, I am not sure what to do and would appreciate some help!



Cheers

Kumar


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Aaron Langford <aa...@gmail.com>.
Senthil,

One of the key steps in debugging this for me was enabling debug level logs
on my cluster, and then looking at the logs in the resource manager. The
failure you are after happens before the exceptions you have reported here.
When your Flink application is starting, it will attempt to load various
file system implementations. You can see which ones it successfully loaded
when you have the debug level of logs configured. You will have to do some
digging, but this is a good place to start. Try to discover if your
application is indeed loading the s3 file system, or if that is not
happening. You should be able to find the file system implementations that
were loaded by searching for the string "Added file system".

Also, do you mind sharing the bootstrap action script that you are using to
get the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <se...@vmware.com> wrote:

> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> *From: *Yang Wang <da...@gmail.com>
> *Date: *Saturday, January 18, 2020 at 7:58 PM
> *To: *Senthil Kumar <se...@vmware.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> I think this exception is not because the hadoop version isn't high
> enough.
>
> It seems that the "s3" URI scheme could not be recognized by
> `S3FileSystemFactory`. So it fallbacks to
>
> the `HadoopFsFactory`.
>
>
>
> Could you share the debug level jobmanager/taskmanger logs so that we
> could confirm whether the
>
> classpath and FileSystem are loaded correctly.
>
>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>
> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a Streaming job from one S3 bucket into an another S3
> bucket using the StreamingFileSink
>
>
>
> I got the infamous exception:
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>
>
>
> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
> /usr/lib/flink/lib
>
>
> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=bHwqfmuyzvc8DZuLRs4FZ4Cil%2Fbd7yaIEerD%2FTKe5eo%3D&reserved=0>
>
>
>
> That did not work.
>
>
>
> Further googling, revealed for Flink 1.9.0 and above:  (according to this)
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=CrsF6vY%2BmSMwqZF9%2FGSiRKdrESYITF7OEYpNMN%2BdI94%3D&reserved=0>
>
>
>
> it seems that I need to install the jar file in the plugins directory
> (/usr/lib/flink/plugins/s3-fs-hadoop)
>
>
>
> That did not work either.
>
>
>
> At this point, I am not sure what to do and would appreciate some help!
>
>
>
> Cheers
>
> Kumar
>
>
>
>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Senthil Kumar <se...@vmware.com>.
Yang, I appreciate your help! Please let me know if I can provide with any other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop/<STEP-ID>/syslog


2020-01-21 16:31:37,587 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer

        at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

        at java.lang.Thread.run(Thread.java:748)


From: Yang Wang <da...@gmail.com>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar <se...@vmware.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by `S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar <se...@vmware.com>> 于2020年1月17日周五 下午10:57写道:

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in /usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=bHwqfmuyzvc8DZuLRs4FZ4Cil%2Fbd7yaIEerD%2FTKe5eo%3D&reserved=0>



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=CrsF6vY%2BmSMwqZF9%2FGSiRKdrESYITF7OEYpNMN%2BdI94%3D&reserved=0>



it seems that I need to install the jar file in the plugins directory (/usr/lib/flink/plugins/s3-fs-hadoop)



That did not work either.



At this point, I am not sure what to do and would appreciate some help!



Cheers

Kumar


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Posted by Yang Wang <da...@gmail.com>.
I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by
`S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could
confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar <se...@vmware.com> 于2020年1月17日周五 下午10:57写道:

> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a Streaming job from one S3 bucket into an another S3
> bucket using the StreamingFileSink
>
>
>
> I got the infamous exception:
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>
>
>
> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
> /usr/lib/flink/lib
>
>
> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
>
>
>
> That did not work.
>
>
>
> Further googling, revealed for Flink 1.9.0 and above:  (according to this)
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>
>
>
> it seems that I need to install the jar file in the plugins directory
> (/usr/lib/flink/plugins/s3-fs-hadoop)
>
>
>
> That did not work either.
>
>
>
> At this point, I am not sure what to do and would appreciate some help!
>
>
>
> Cheers
>
> Kumar
>
>
>