You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jonas eyob <jo...@gmail.com> on 2021/08/26 13:42:38 UTC

Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Hey,

I am setting up HA on a standalone Kubernetes Flink application job
cluster.
Flink (1.12.5) is used and I am using S3 as the storage backend

* The JobManager shortly fails after starts with the following errors
(apologies in advance for the length), and I can't understand what's going
on.
* First I thought it may be due to missing Delete privileges of the IAM
role and updated that, but the problem persists.
* The S3 bucket configured s3://<company>/recovery is empty.

configmap.yaml
flink-conf.yaml: |+
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 2
scheduler-mode: reactive
execution.checkpointing.interval: 10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
kubernetes.cluster-id: {{ $fullName }}
high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
}}/recovery
hive.s3.use-instance-credentials: true
kubernetes.namespace: {{ $fullName }} # The namespace that will be used for
running the jobmanager and taskmanager pods

role.yaml
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ $fullName }}
namespace: {{ $fullName }}
labels:
app: {{ $appName }}
chart: {{ template "thoros.chart" . }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}

rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]

aws IAM policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket",
                "s3:Get*",
                "s3:Put*",
                "s3:Delete*"
            ],
            "Resource": [
                "arn:aws:s3:::<company>-flink-dev/*"
            ],
            "Effect": "Allow"
        }
    ]
}

*Error-log:*
2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
               [] - Executing pipeline using FlinkRunner.
2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
               [] - For maximum performance you should set the 'fasterCopy'
option. See more at https://issues.apache.org/jira/browse/BEAM-11146
2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
               [] - Translating pipeline to Flink program.
2021-08-26 13:08:43,456 INFO
 org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
unbounded PCollection. Switching to streaming execution.
2021-08-26 13:08:43,461 INFO
 org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
a Streaming Environment.
2021-08-26 13:08:43,462 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.address, thoros-jobmanager
2021-08-26 13:08:43,462 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.port, 6123
2021-08-26 13:08:43,462 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2021-08-26 13:08:43,463 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 2
2021-08-26 13:08:43,463 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.rpc.port, 6122
2021-08-26 13:08:43,463 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2021-08-26 13:08:43,463 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: blob.server.port, 6124
2021-08-26 13:08:43,464 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: queryable-state.proxy.ports, 6125
2021-08-26 13:08:43,464 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: parallelism.default, 2
2021-08-26 13:08:43,465 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: scheduler-mode, reactive
2021-08-26 13:08:43,465 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: execution.checkpointing.interval, 10s
2021-08-26 13:08:43,466 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: restart-strategy, fixed-delay
2021-08-26 13:08:43,466 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: restart-strategy.fixed-delay.attempts, 10
2021-08-26 13:08:43,466 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: high-availability,
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-08-26 13:08:43,467 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: kubernetes.cluster-id, thoros
2021-08-26 13:08:43,467 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: high-availability.storageDir,
s3://<company>-flink-dev/recovery
2021-08-26 13:08:43,468 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: hive.s3.use-instance-credentials, true
2021-08-26 13:08:43,468 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: kubernetes.namespace, thoros
2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
               [] - Starting execution of Flink program.
2021-08-26 13:08:45,444 INFO
 org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Job 00000000000000000000000000000000 is submitted.
2021-08-26 13:08:45,454 INFO
 org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Submitting Job with JobId=00000000000000000000000000000000.
2021-08-26 13:08:45,486 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
JobGraph submission 00000000000000000000000000000000
(main0-flink-0826130845-6f3e805f).
2021-08-26 13:08:45,498 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
Submitting job 00000000000000000000000000000000
(main0-flink-0826130845-6f3e805f).
2021-08-26 13:08:46,152 INFO
 org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
job graph 00000000000000000000000000000000 from
KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
2021-08-26 13:08:46,169 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
Clean up the high availability data for job
00000000000000000000000000000000.
2021-08-26 13:08:46,213 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
Finished cleaning up the high availability data for job
00000000000000000000000000000000.
2021-08-26 13:08:46,231 WARN
 org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
to delete blob at
s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
2021-08-26 13:08:46,239 ERROR
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
submit job 00000000000000000000000000000000.
java.lang.RuntimeException: java.lang.Exception: Could not open output
stream for state backend
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_302]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
Caused by: java.lang.Exception: Could not open output stream for state
backend
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
Caused by:
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
V0BWCA4RDVE0EVK8; S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
Proxy: null), S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
(Path:
s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
~[?:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Posted by jonas eyob <jo...@gmail.com>.
@Svend - that seems to have done the trick, adding the bucket itself as a
resource got flink to write to the configured s3 bucket.

@Gil - we manage our kubernetes cluster on aws with kops. But we do assign
the iam roles through the deployment annotations. Seems presto is able to
use the s3:// schema in our case

Thanks both!

Den tors 26 aug. 2021 kl 17:59 skrev Gil De Grove <gi...@euranova.eu>:

> Hi Jonas,
>
>
>
> Just wondering, are you trying to deploy via iam service account
> annotations in a AWS eks cluster?
>
> We noticed that when using presto, the iam service account was using en
> ec2 metadata API inside AWS. However, when using eks service account, the
> API used is the webtoken auth.
>
> Not sure if the solution we find is the appropriate one, but switching to
> s3a instead of presto, and forcing the aws defaultProviderChain did the
> trick.
>
> Maybe you could try that.
>
> Regards,
> Gil
>
> On Thu, Aug 26, 2021, 18:45 Svend <st...@svend.xyz> wrote:
>
>> Hi Jonas,
>>
>> Just a thought, could you try this policy? If I recall correctly, I think
>> you need ListBucket on the bucket itself, whereas the other can have a path
>> prefix like the "/*" you added
>>
>> "
>> {
>>     "Version": "2012-10-17",
>>     "Statement": [
>>         {
>>             "Action": [
>>                 "s3:ListBucket",
>>                 "s3:Get*",
>>                 "s3:Put*",
>>                 "s3:Delete*"
>>             ],
>>             "Resource": [
>>                 "arn:aws:s3:::<company>-flink-dev",
>>                 "arn:aws:s3:::<company>-flink-dev/*"
>>             ],
>>             "Effect": "Allow"
>>         }
>>     ]
>> }
>> "
>>
>> Svend
>>
>>
>> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>>
>> Hey Matthias,
>>
>> Yes, I have followed the documentation on the link you provided - and
>> decided to go for the recommended approach of using IAM roles.
>> The hive.s3.use-instance-credentials configuration parameter I got from
>> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
>> says:
>>
>> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
>> is based on code from the Presto project <https://prestodb.io/>. You can
>> configure it using the same configuration keys as the Presto file system
>> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
>> by adding the configurations to your flink-conf.yaml. The Presto S3
>> implementation is the recommended file system for checkpointing to S3....
>>
>> Its possible I am misunderstanding it?
>>
>> Best,
>> Jonas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>
>> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <
>> matthias@ververica.com>:
>>
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
>> being a valid configuration parameter.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>>
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3://<company>/recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>>     "Version": "2012-10-17",
>>     "Statement": [
>>         {
>>             "Action": [
>>                 "s3:ListBucket",
>>                 "s3:Get*",
>>                 "s3:Put*",
>>                 "s3:Delete*"
>>             ],
>>             "Resource": [
>>                 "arn:aws:s3:::<company>-flink-dev/*"
>>             ],
>>             "Effect": "Allow"
>>         }
>>     ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
>> unbounded PCollection. Switching to streaming execution.
>> 2021-08-26 13:08:43,461 INFO
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
>> a Streaming Environment.
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.memory.process.size, 1600m
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.numberOfTaskSlots, 2
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.rpc.port, 6122
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.memory.process.size, 1728m
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: blob.server.port, 6124
>> 2021-08-26 13:08:43,464 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: queryable-state.proxy.ports, 6125
>> 2021-08-26 13:08:43,464 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: parallelism.default, 2
>> 2021-08-26 13:08:43,465 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: scheduler-mode, reactive
>> 2021-08-26 13:08:43,465 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: execution.checkpointing.interval, 10s
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: restart-strategy, fixed-delay
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: restart-strategy.fixed-delay.attempts, 10
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: high-availability,
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 2021-08-26 13:08:43,467 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: kubernetes.cluster-id, thoros
>> 2021-08-26 13:08:43,467 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: high-availability.storageDir,
>> s3://<company>-flink-dev/recovery
>> 2021-08-26 13:08:43,468 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: hive.s3.use-instance-credentials, true
>> 2021-08-26 13:08:43,468 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: kubernetes.namespace, thoros
>> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Starting execution of Flink program.
>> 2021-08-26 13:08:45,444 INFO
>>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 00000000000000000000000000000000 is submitted.
>> 2021-08-26 13:08:45,454 INFO
>>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=00000000000000000000000000000000.
>> 2021-08-26 13:08:45,486 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
>> JobGraph submission 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:45,498 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
>> Submitting job 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:46,152 INFO
>>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
>> job graph 00000000000000000000000000000000 from
>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>> 2021-08-26 13:08:46,169 INFO
>>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,213 INFO
>>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Finished cleaning up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,231 WARN
>>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
>> to delete blob at
>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>> 2021-08-26 13:08:46,239 ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
>> submit job 00000000000000000000000000000000.
>> java.lang.RuntimeException: java.lang.Exception: Could not open output
>> stream for state backend
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_302]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> Caused by: java.lang.Exception: Could not open output stream for state
>> backend
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> Caused by:
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
>> Proxy: null), S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
>> (Path:
>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>> ~[?:?]
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
>> ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>> ~[?:?]
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>>
>>
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>>
>>

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Posted by Gil De Grove <gi...@euranova.eu>.
Hi Jonas,



Just wondering, are you trying to deploy via iam service account
annotations in a AWS eks cluster?

We noticed that when using presto, the iam service account was using en ec2
metadata API inside AWS. However, when using eks service account, the API
used is the webtoken auth.

Not sure if the solution we find is the appropriate one, but switching to
s3a instead of presto, and forcing the aws defaultProviderChain did the
trick.

Maybe you could try that.

Regards,
Gil

On Thu, Aug 26, 2021, 18:45 Svend <st...@svend.xyz> wrote:

> Hi Jonas,
>
> Just a thought, could you try this policy? If I recall correctly, I think
> you need ListBucket on the bucket itself, whereas the other can have a path
> prefix like the "/*" you added
>
> "
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev",
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
> "
>
> Svend
>
>
> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>
> Hey Matthias,
>
> Yes, I have followed the documentation on the link you provided - and
> decided to go for the recommended approach of using IAM roles.
> The hive.s3.use-instance-credentials configuration parameter I got from
> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
> says:
>
> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
> is based on code from the Presto project <https://prestodb.io/>. You can
> configure it using the same configuration keys as the Presto file system
> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
> by adding the configurations to your flink-conf.yaml. The Presto S3
> implementation is the recommended file system for checkpointing to S3....
>
> Its possible I am misunderstanding it?
>
> Best,
> Jonas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <matthias@ververica.com
> >:
>
> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>
> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>                  [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Posted by Svend <st...@svend.xyz>.
Hi Jonas,

Just a thought, could you try this policy? If I recall correctly, I think you need ListBucket on the bucket itself, whereas the other can have a path prefix like the "/*" you added

"
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket",
                "s3:Get*",
                "s3:Put*",
                "s3:Delete*"
            ],
            "Resource": [
                "arn:aws:s3:::<company>-flink-dev",
                "arn:aws:s3:::<company>-flink-dev/*"
            ],
            "Effect": "Allow"
        }
    ]
}
"

Svend


On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
> Hey Matthias,
> 
> Yes, I have followed the documentation on the link you provided - and decided to go for the recommended approach of using IAM roles. 
> The hive.s3.use-instance-credentials configuration parameter I got from [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which says:
> 
> ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is based on code from the Presto project <https://prestodb.io/>. You can configure it using the same configuration keys as the Presto file system <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, by adding the configurations to your `flink-conf.yaml`. The Presto S3 implementation is the recommended file system for checkpointing to S3....
> 
> Its possible I am misunderstanding it?
> 
> Best,
> Jonas
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> 
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <ma...@ververica.com>:
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's described in [1]? I'm not sure about this hive.s3.use-instance-credentials being a valid configuration parameter. 
>> 
>> Best,
>> Matthias
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> 
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>>> Hey,
>>> 
>>> I am setting up HA on a standalone Kubernetes Flink application job cluster. 
>>> Flink (1.12.5) is used and I am using S3 as the storage backend 
>>> 
>>> * The JobManager shortly fails after starts with the following errors (apologies in advance for the length), and I can't understand what's going on.
>>> * First I thought it may be due to missing Delete privileges of the IAM role and updated that, but the problem persists. 
>>> * The S3 bucket configured s3://<company>/recovery is empty.
>>> 
>>> configmap.yaml
>>> flink-conf.yaml: |+
>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.numberOfTaskSlots: 2
>>> taskmanager.rpc.port: 6122
>>> taskmanager.memory.process.size: 1728m
>>> blob.server.port: 6124
>>> queryable-state.proxy.ports: 6125
>>> parallelism.default: 2
>>> scheduler-mode: reactive
>>> execution.checkpointing.interval: 10s
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>> high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.cluster-id: {{ $fullName }}
>>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment }}/recovery
>>> hive.s3.use-instance-credentials: true
>>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for running the jobmanager and taskmanager pods
>>> 
>>> role.yaml
>>> kind: Role
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> metadata:
>>> name: {{ $fullName }}
>>> namespace: {{ $fullName }}
>>> labels:
>>> app: {{ $appName }}
>>> chart: {{ template "thoros.chart" . }}
>>> release: {{ .Release.Name }}
>>> heritage: {{ .Release.Service }}
>>> 
>>> rules:
>>> - apiGroups: [""]
>>> resources: ["configmaps"]
>>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>> 
>>> aws IAM policy
>>> {
>>>     "Version": "2012-10-17",
>>>     "Statement": [
>>>         {
>>>             "Action": [
>>>                 "s3:ListBucket",
>>>                 "s3:Get*",
>>>                 "s3:Put*",
>>>                 "s3:Delete*"
>>>             ],
>>>             "Resource": [
>>>                 "arn:aws:s3:::<company>-flink-dev/*"
>>>             ],
>>>             "Effect": "Allow"
>>>         }
>>>     ]
>>> }
>>> 
>>> *Error-log:*
>>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner                    [] - Executing pipeline using FlinkRunner.
>>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner                    [] - For maximum performance you should set the 'fasterCopy' option. See more at https://issues.apache.org/jira/browse/BEAM-11146
>>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner                    [] - Translating pipeline to Flink program.
>>> 2021-08-26 13:08:43,456 INFO  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found unbounded PCollection. Switching to streaming execution.
>>> 2021-08-26 13:08:43,461 INFO  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating a Streaming Environment.
>>> 2021-08-26 13:08:43,462 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, thoros-jobmanager
>>> 2021-08-26 13:08:43,462 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
>>> 2021-08-26 13:08:43,462 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
>>> 2021-08-26 13:08:43,463 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2
>>> 2021-08-26 13:08:43,463 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122
>>> 2021-08-26 13:08:43,463 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
>>> 2021-08-26 13:08:43,463 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124
>>> 2021-08-26 13:08:43,464 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: queryable-state.proxy.ports, 6125
>>> 2021-08-26 13:08:43,464 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 2
>>> 2021-08-26 13:08:43,465 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: scheduler-mode, reactive
>>> 2021-08-26 13:08:43,465 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.interval, 10s
>>> 2021-08-26 13:08:43,466 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy, fixed-delay
>>> 2021-08-26 13:08:43,466 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy.fixed-delay.attempts, 10
>>> 2021-08-26 13:08:43,466 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability, org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> 2021-08-26 13:08:43,467 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.cluster-id, thoros
>>> 2021-08-26 13:08:43,467 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.storageDir, s3://<company>-flink-dev/recovery
>>> 2021-08-26 13:08:43,468 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: hive.s3.use-instance-credentials, true
>>> 2021-08-26 13:08:43,468 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.namespace, thoros
>>> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner                    [] - Starting execution of Flink program.
>>> 2021-08-26 13:08:45,444 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted.
>>> 2021-08-26 13:08:45,454 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000.
>>> 2021-08-26 13:08:45,486 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 00000000000000000000000000000000 (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:45,498 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 00000000000000000000000000000000 (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:46,152 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>>> 2021-08-26 13:08:46,169 INFO  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,213 INFO  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,231 WARN  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed to delete blob at s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>>> 2021-08-26 13:08:46,239 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to submit job 00000000000000000000000000000000.
>>> java.lang.RuntimeException: java.lang.Exception: Could not open output stream for state backend
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_302]
>>> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_302]
>>> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_302]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> Caused by: java.lang.Exception: Could not open output stream for state backend
>>> at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: V0BWCA4RDVE0EVK8; S3 Extended Request ID: yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=; Proxy: null), S3 Extended Request ID: yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o= (Path: s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573) ~[?:?]
>>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>>> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154) ~[?:?]
>>> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[?:?]
>>> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> -- 
>>> *Med Vänliga Hälsningar*
>>> *Jonas Eyob*
> 
> 
> -- 
> *Med Vänliga Hälsningar*
> *Jonas Eyob*

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Posted by jonas eyob <jo...@gmail.com>.
Hey Matthias,

Yes, I have followed the documentation on the link you provided - and
decided to go for the recommended approach of using IAM roles.
The hive.s3.use-instance-credentials configuration parameter I got from [1]
(first bullet) since I am using the flink-s3-fs-presto plugin - which says:

..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*, is
based on code from the Presto project <https://prestodb.io/>. You can
configure it using the same configuration keys as the Presto file system
<https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
by adding the configurations to your flink-conf.yaml. The Presto S3
implementation is the recommended file system for checkpointing to S3....

Its possible I am misunderstanding it?

Best,
Jonas

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <ma...@ververica.com>:

> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3://<company>/recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>>     "Version": "2012-10-17",
>>     "Statement": [
>>         {
>>             "Action": [
>>                 "s3:ListBucket",
>>                 "s3:Get*",
>>                 "s3:Put*",
>>                 "s3:Delete*"
>>             ],
>>             "Resource": [
>>                 "arn:aws:s3:::<company>-flink-dev/*"
>>             ],
>>             "Effect": "Allow"
>>         }
>>     ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
>> unbounded PCollection. Switching to streaming execution.
>> 2021-08-26 13:08:43,461 INFO
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
>> a Streaming Environment.
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: jobmanager.memory.process.size, 1600m
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.numberOfTaskSlots, 2
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.rpc.port, 6122
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: taskmanager.memory.process.size, 1728m
>> 2021-08-26 13:08:43,463 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: blob.server.port, 6124
>> 2021-08-26 13:08:43,464 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: queryable-state.proxy.ports, 6125
>> 2021-08-26 13:08:43,464 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: parallelism.default, 2
>> 2021-08-26 13:08:43,465 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: scheduler-mode, reactive
>> 2021-08-26 13:08:43,465 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: execution.checkpointing.interval, 10s
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: restart-strategy, fixed-delay
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: restart-strategy.fixed-delay.attempts, 10
>> 2021-08-26 13:08:43,466 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: high-availability,
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 2021-08-26 13:08:43,467 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: kubernetes.cluster-id, thoros
>> 2021-08-26 13:08:43,467 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: high-availability.storageDir,
>> s3://<company>-flink-dev/recovery
>> 2021-08-26 13:08:43,468 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: hive.s3.use-instance-credentials, true
>> 2021-08-26 13:08:43,468 INFO
>>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
>> configuration property: kubernetes.namespace, thoros
>> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>>                  [] - Starting execution of Flink program.
>> 2021-08-26 13:08:45,444 INFO
>>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 00000000000000000000000000000000 is submitted.
>> 2021-08-26 13:08:45,454 INFO
>>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=00000000000000000000000000000000.
>> 2021-08-26 13:08:45,486 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
>> JobGraph submission 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:45,498 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
>> Submitting job 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:46,152 INFO
>>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
>> job graph 00000000000000000000000000000000 from
>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>> 2021-08-26 13:08:46,169 INFO
>>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,213 INFO
>>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Finished cleaning up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,231 WARN
>>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
>> to delete blob at
>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>> 2021-08-26 13:08:46,239 ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
>> submit job 00000000000000000000000000000000.
>> java.lang.RuntimeException: java.lang.Exception: Could not open output
>> stream for state backend
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_302]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> Caused by: java.lang.Exception: Could not open output stream for state
>> backend
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> Caused by:
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
>> Proxy: null), S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
>> (Path:
>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>> ~[?:?]
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
>> ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>> ~[?:?]
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Jonas,
have you included the s3 credentials in the Flink config file like it's
described in [1]? I'm not sure about this hive.s3.use-instance-credentials
being a valid configuration parameter.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:

> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>                  [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>