You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yitzchak Lieberman <yi...@sentinelone.com> on 2019/06/11 07:16:25 UTC

StreamingFileSink in version 1.8

Hi.


I'm a bit confused:

When launching my flink streaming application on EMR release 5.24 (which
have flink 1.8 version) that write Kafka messages to s3 parquet files i'm
getting the exception below, but when i'm installing flink 1.8 on EMR
custom wise it works.

What could be the difference behavior?


Thanks,

Yitzchak.


Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer

at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)

at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)

at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)

at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Re: StreamingFileSink in version 1.8

Posted by Yitzchak Lieberman <yi...@sentinelone.com>.
Hi.

I found that the problem is that i didn't have
flink-s3-fs-hadoop-<version>.jar in flink lib directory, with that i can
use 's3a' protocol.

On Tue, Jun 11, 2019 at 4:48 PM Ken Krugler <kk...@transpac.com>
wrote:

> The code in HadoopRecoverableWriter is:
>
> if (!"hdfs".equalsIgnoreCase(fs.getScheme()) ||
> !HadoopUtils.isMinHadoopVersion(2, 7)) {
> throw new UnsupportedOperationException(
> "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop
> version 2.7 or newer");
> }
>
> So one possibility is that your sink path doesn’t have the explicit
> hdfs://xxx protocol.
>
> Another is that you’re in classpath hell, and your job jar contains an
> older version of Hadoop jars.
>
> — Ken
>
>
> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <
> yitzchakl@sentinelone.com> wrote:
>
> Hi.
>
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which
> have flink 1.8 version) that write Kafka messages to s3 parquet files i'm
> getting the exception below, but when i'm installing flink 1.8 on EMR
> custom wise it works.
> What could be the difference behavior?
>
> Thanks,
> Yitzchak.
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
> at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Re: StreamingFileSink in version 1.8

Posted by Ken Krugler <kk...@transpac.com>.
The code in HadoopRecoverableWriter is:

		if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
			throw new UnsupportedOperationException(
					"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
		}

So one possibility is that your sink path doesn’t have the explicit hdfs://xxx protocol. 

Another is that you’re in classpath hell, and your job jar contains an older version of Hadoop jars.

— Ken


> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <yi...@sentinelone.com> wrote:
> 
> Hi.
> 
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which have flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting the exception below, but when i'm installing flink 1.8 on EMR custom wise it works.
> What could be the difference behavior?
> 
> Thanks,
> Yitzchak.
> 
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
> 	at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> 	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra