You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Guowei Ma <gu...@gmail.com> on 2021/04/01 00:29:02 UTC

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

Hi, Robert
I think you could try to change the "s3://argo-artifacts/" to "
s3a://argo-artifacts/".
It is because that currently `StreamingFileSink` only supports Hadoop based
s3 but not Presto based s3. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations

Best,
Guowei


On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen <ci...@gmail.com> wrote:

> I’m using a local instance of MINIO on my kubernetes cluster for
> checkpoint/savepoint storage. I’m using this StreamingFileSync
> configuration:
>
>
>         final StreamingFileSink<Tuple2<String, Long>> sink =
>                 StreamingFileSink.forRowFormat(
>                         new Path("s3://argo-artifacts/"),
>                         new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8"))
>                         .withBucketAssigner(new KeyBucketAssigner())
>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                         .withOutputFileConfig(config)
>                         .build();
>
> Anyone know how to fix this exception?
>
> java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
>     at org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) ~[?:?]
>     at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>
> --
> Robert Cullen
> 240-475-4490
>

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Robert
It seems that your AccessKeyId is not valid.
I think you could find more detailed from [1] about how to configure the
s3' access key.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/
Best,
Guowei


On Thu, Apr 1, 2021 at 9:19 PM Robert Cullen <ci...@gmail.com> wrote:

> Guowei,
>
>  I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm
> using the example playground from here:
>
> [1] https://docs.ververica.com/getting_started/installation.html
>
> org.apache.flink.util.SerializedThrowable: 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate MultiPartUpload on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 Extended Request ID: VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; Proxy: null), S3 Extended Request ID: VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId
>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?]
>     at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) ~[?:?]
>     at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198) ~[?:?]
>     at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62) ~[?:?]
>     at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253) ~[?:?]
>     at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68) ~[?:?]
>     at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78) ~[?:?]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90) ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34) ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
> Caused by: org.apache.flink.util.SerializedThrowable: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 Extended Request ID: VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; Proxy: null)
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[?:?
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) ~[?:?]
>     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) ~?:?]
>     at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581) ~[?:?]
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597) ~[?:?]
>     at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199) ~[?:?]
>     at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ~[?:?]
>     ... 35 more
>
>
> On Wed, Mar 31, 2021 at 8:29 PM Guowei Ma <gu...@gmail.com> wrote:
>
>> Hi, Robert
>> I think you could try to change the "s3://argo-artifacts/" to "
>> s3a://argo-artifacts/".
>> It is because that currently `StreamingFileSink` only supports Hadoop
>> based s3 but not Presto based s3. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen <ci...@gmail.com>
>> wrote:
>>
>>> I’m using a local instance of MINIO on my kubernetes cluster for
>>> checkpoint/savepoint storage. I’m using this StreamingFileSync
>>> configuration:
>>>
>>>
>>>         final StreamingFileSink<Tuple2<String, Long>> sink =
>>>                 StreamingFileSink.forRowFormat(
>>>                         new Path("s3://argo-artifacts/"),
>>>                         new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8"))
>>>                         .withBucketAssigner(new KeyBucketAssigner())
>>>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>                         .withOutputFileConfig(config)
>>>                         .build();
>>>
>>> Anyone know how to fix this exception?
>>>
>>> java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
>>>     at org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) ~[?:?]
>>>     at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

Posted by Robert Cullen <ci...@gmail.com>.
Guowei,

 I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm
using the example playground from here:

[1] https://docs.ververica.com/getting_started/installation.html

org.apache.flink.util.SerializedThrowable:
4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate
MultiPartUpload on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key
Id you provided does not exist in our records. (Service: Amazon S3;
Status Code: 403; Error Code: InvalidAccessKeyId; Request ID:
RMD85E1G3WAK18VE; S3 Extended Request ID:
VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=;
Proxy: null), S3 Extended Request ID:
VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) ~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?]
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
~[?:?]
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
~[?:?]
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62)
~[?:?]
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253)
~[?:?]
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
~[?:?]
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78)
~[?:?]
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34)
~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
Caused by: org.apache.flink.util.SerializedThrowable: The AWS Access
Key Id you provided does not exist in our records. (Service: Amazon
S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID:
RMD85E1G3WAK18VE; S3 Extended Request ID:
VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=;
Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[?:?
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[?:?]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[?:?]
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
~[?:?]
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
~?:?]
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
~[?:?]
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
~[?:?]
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
~[?:?]
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ~[?:?]
    ... 35 more


On Wed, Mar 31, 2021 at 8:29 PM Guowei Ma <gu...@gmail.com> wrote:

> Hi, Robert
> I think you could try to change the "s3://argo-artifacts/" to "
> s3a://argo-artifacts/".
> It is because that currently `StreamingFileSink` only supports Hadoop
> based s3 but not Presto based s3. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen <ci...@gmail.com>
> wrote:
>
>> I’m using a local instance of MINIO on my kubernetes cluster for
>> checkpoint/savepoint storage. I’m using this StreamingFileSync
>> configuration:
>>
>>
>>         final StreamingFileSink<Tuple2<String, Long>> sink =
>>                 StreamingFileSink.forRowFormat(
>>                         new Path("s3://argo-artifacts/"),
>>                         new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8"))
>>                         .withBucketAssigner(new KeyBucketAssigner())
>>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>                         .withOutputFileConfig(config)
>>                         .build();
>>
>> Anyone know how to fix this exception?
>>
>> java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
>>     at org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) ~[?:?]
>>     at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490