You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jonas eyob <jo...@gmail.com> on 2021/08/26 13:42:38 UTC
Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Hey,
I am setting up HA on a standalone Kubernetes Flink application job
cluster.
Flink (1.12.5) is used and I am using S3 as the storage backend
* The JobManager shortly fails after starts with the following errors
(apologies in advance for the length), and I can't understand what's going
on.
* First I thought it may be due to missing Delete privileges of the IAM
role and updated that, but the problem persists.
* The S3 bucket configured s3://<company>/recovery is empty.
configmap.yaml
flink-conf.yaml: |+
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 2
scheduler-mode: reactive
execution.checkpointing.interval: 10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
kubernetes.cluster-id: {{ $fullName }}
high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
}}/recovery
hive.s3.use-instance-credentials: true
kubernetes.namespace: {{ $fullName }} # The namespace that will be used for
running the jobmanager and taskmanager pods
role.yaml
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ $fullName }}
namespace: {{ $fullName }}
labels:
app: {{ $appName }}
chart: {{ template "thoros.chart" . }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
aws IAM policy
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:Get*",
"s3:Put*",
"s3:Delete*"
],
"Resource": [
"arn:aws:s3:::<company>-flink-dev/*"
],
"Effect": "Allow"
}
]
}
*Error-log:*
2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner
[] - Executing pipeline using FlinkRunner.
2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner
[] - For maximum performance you should set the 'fasterCopy'
option. See more at https://issues.apache.org/jira/browse/BEAM-11146
2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner
[] - Translating pipeline to Flink program.
2021-08-26 13:08:43,456 INFO
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
unbounded PCollection. Switching to streaming execution.
2021-08-26 13:08:43,461 INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
a Streaming Environment.
2021-08-26 13:08:43,462 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: jobmanager.rpc.address, thoros-jobmanager
2021-08-26 13:08:43,462 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: jobmanager.rpc.port, 6123
2021-08-26 13:08:43,462 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2021-08-26 13:08:43,463 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 2
2021-08-26 13:08:43,463 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: taskmanager.rpc.port, 6122
2021-08-26 13:08:43,463 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2021-08-26 13:08:43,463 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: blob.server.port, 6124
2021-08-26 13:08:43,464 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: queryable-state.proxy.ports, 6125
2021-08-26 13:08:43,464 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: parallelism.default, 2
2021-08-26 13:08:43,465 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: scheduler-mode, reactive
2021-08-26 13:08:43,465 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: execution.checkpointing.interval, 10s
2021-08-26 13:08:43,466 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: restart-strategy, fixed-delay
2021-08-26 13:08:43,466 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: restart-strategy.fixed-delay.attempts, 10
2021-08-26 13:08:43,466 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: high-availability,
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-08-26 13:08:43,467 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: kubernetes.cluster-id, thoros
2021-08-26 13:08:43,467 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: high-availability.storageDir,
s3://<company>-flink-dev/recovery
2021-08-26 13:08:43,468 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: hive.s3.use-instance-credentials, true
2021-08-26 13:08:43,468 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: kubernetes.namespace, thoros
2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner
[] - Starting execution of Flink program.
2021-08-26 13:08:45,444 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Job 00000000000000000000000000000000 is submitted.
2021-08-26 13:08:45,454 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Submitting Job with JobId=00000000000000000000000000000000.
2021-08-26 13:08:45,486 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
JobGraph submission 00000000000000000000000000000000
(main0-flink-0826130845-6f3e805f).
2021-08-26 13:08:45,498 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Submitting job 00000000000000000000000000000000
(main0-flink-0826130845-6f3e805f).
2021-08-26 13:08:46,152 INFO
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
job graph 00000000000000000000000000000000 from
KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
2021-08-26 13:08:46,169 INFO
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
Clean up the high availability data for job
00000000000000000000000000000000.
2021-08-26 13:08:46,213 INFO
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
Finished cleaning up the high availability data for job
00000000000000000000000000000000.
2021-08-26 13:08:46,231 WARN
org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed
to delete blob at
s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
2021-08-26 13:08:46,239 ERROR
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to
submit job 00000000000000000000000000000000.
java.lang.RuntimeException: java.lang.Exception: Could not open output
stream for state backend
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_302]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
Caused by: java.lang.Exception: Could not open output stream for state
backend
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
Caused by:
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
V0BWCA4RDVE0EVK8; S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
Proxy: null), S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
(Path:
s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
~[?:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
--
*Med Vänliga Hälsningar*
*Jonas Eyob*
Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Posted by jonas eyob <jo...@gmail.com>.
@Svend - that seems to have done the trick, adding the bucket itself as a
resource got flink to write to the configured s3 bucket.
@Gil - we manage our kubernetes cluster on aws with kops. But we do assign
the iam roles through the deployment annotations. Seems presto is able to
use the s3:// schema in our case
Thanks both!
Den tors 26 aug. 2021 kl 17:59 skrev Gil De Grove <gi...@euranova.eu>:
> Hi Jonas,
>
>
>
> Just wondering, are you trying to deploy via iam service account
> annotations in a AWS eks cluster?
>
> We noticed that when using presto, the iam service account was using en
> ec2 metadata API inside AWS. However, when using eks service account, the
> API used is the webtoken auth.
>
> Not sure if the solution we find is the appropriate one, but switching to
> s3a instead of presto, and forcing the aws defaultProviderChain did the
> trick.
>
> Maybe you could try that.
>
> Regards,
> Gil
>
> On Thu, Aug 26, 2021, 18:45 Svend <st...@svend.xyz> wrote:
>
>> Hi Jonas,
>>
>> Just a thought, could you try this policy? If I recall correctly, I think
>> you need ListBucket on the bucket itself, whereas the other can have a path
>> prefix like the "/*" you added
>>
>> "
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::<company>-flink-dev",
>> "arn:aws:s3:::<company>-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>> "
>>
>> Svend
>>
>>
>> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>>
>> Hey Matthias,
>>
>> Yes, I have followed the documentation on the link you provided - and
>> decided to go for the recommended approach of using IAM roles.
>> The hive.s3.use-instance-credentials configuration parameter I got from
>> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
>> says:
>>
>> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
>> is based on code from the Presto project <https://prestodb.io/>. You can
>> configure it using the same configuration keys as the Presto file system
>> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
>> by adding the configurations to your flink-conf.yaml. The Presto S3
>> implementation is the recommended file system for checkpointing to S3....
>>
>> Its possible I am misunderstanding it?
>>
>> Best,
>> Jonas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>
>> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <
>> matthias@ververica.com>:
>>
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
>> being a valid configuration parameter.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>>
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3://<company>/recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::<company>-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner
>> [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
>> unbounded PCollection. Switching to streaming execution.
>> 2021-08-26 13:08:43,461 INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
>> a Streaming Environment.
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.memory.process.size, 1600m
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.numberOfTaskSlots, 2
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.rpc.port, 6122
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.memory.process.size, 1728m
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: blob.server.port, 6124
>> 2021-08-26 13:08:43,464 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: queryable-state.proxy.ports, 6125
>> 2021-08-26 13:08:43,464 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: parallelism.default, 2
>> 2021-08-26 13:08:43,465 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: scheduler-mode, reactive
>> 2021-08-26 13:08:43,465 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: execution.checkpointing.interval, 10s
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: restart-strategy, fixed-delay
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: restart-strategy.fixed-delay.attempts, 10
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: high-availability,
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 2021-08-26 13:08:43,467 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: kubernetes.cluster-id, thoros
>> 2021-08-26 13:08:43,467 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: high-availability.storageDir,
>> s3://<company>-flink-dev/recovery
>> 2021-08-26 13:08:43,468 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: hive.s3.use-instance-credentials, true
>> 2021-08-26 13:08:43,468 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: kubernetes.namespace, thoros
>> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Starting execution of Flink program.
>> 2021-08-26 13:08:45,444 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 00000000000000000000000000000000 is submitted.
>> 2021-08-26 13:08:45,454 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=00000000000000000000000000000000.
>> 2021-08-26 13:08:45,486 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
>> JobGraph submission 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:45,498 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>> Submitting job 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:46,152 INFO
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
>> job graph 00000000000000000000000000000000 from
>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>> 2021-08-26 13:08:46,169 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,213 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Finished cleaning up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,231 WARN
>> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed
>> to delete blob at
>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>> 2021-08-26 13:08:46,239 ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to
>> submit job 00000000000000000000000000000000.
>> java.lang.RuntimeException: java.lang.Exception: Could not open output
>> stream for state backend
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_302]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> Caused by: java.lang.Exception: Could not open output stream for state
>> backend
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> Caused by:
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
>> Proxy: null), S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
>> (Path:
>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>> ~[?:?]
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
>> ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>> ~[?:?]
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>>
>>
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>>
>>
--
*Med Vänliga Hälsningar*
*Jonas Eyob*
Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Posted by Gil De Grove <gi...@euranova.eu>.
Hi Jonas,
Just wondering, are you trying to deploy via iam service account
annotations in a AWS eks cluster?
We noticed that when using presto, the iam service account was using en ec2
metadata API inside AWS. However, when using eks service account, the API
used is the webtoken auth.
Not sure if the solution we find is the appropriate one, but switching to
s3a instead of presto, and forcing the aws defaultProviderChain did the
trick.
Maybe you could try that.
Regards,
Gil
On Thu, Aug 26, 2021, 18:45 Svend <st...@svend.xyz> wrote:
> Hi Jonas,
>
> Just a thought, could you try this policy? If I recall correctly, I think
> you need ListBucket on the bucket itself, whereas the other can have a path
> prefix like the "/*" you added
>
> "
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::<company>-flink-dev",
> "arn:aws:s3:::<company>-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
> "
>
> Svend
>
>
> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>
> Hey Matthias,
>
> Yes, I have followed the documentation on the link you provided - and
> decided to go for the recommended approach of using IAM roles.
> The hive.s3.use-instance-credentials configuration parameter I got from
> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
> says:
>
> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
> is based on code from the Presto project <https://prestodb.io/>. You can
> configure it using the same configuration keys as the Presto file system
> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
> by adding the configurations to your flink-conf.yaml. The Presto S3
> implementation is the recommended file system for checkpointing to S3....
>
> Its possible I am misunderstanding it?
>
> Best,
> Jonas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <matthias@ververica.com
> >:
>
> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>
> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::<company>-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner
> [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>
Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Posted by Svend <st...@svend.xyz>.
Hi Jonas,
Just a thought, could you try this policy? If I recall correctly, I think you need ListBucket on the bucket itself, whereas the other can have a path prefix like the "/*" you added
"
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:Get*",
"s3:Put*",
"s3:Delete*"
],
"Resource": [
"arn:aws:s3:::<company>-flink-dev",
"arn:aws:s3:::<company>-flink-dev/*"
],
"Effect": "Allow"
}
]
}
"
Svend
On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
> Hey Matthias,
>
> Yes, I have followed the documentation on the link you provided - and decided to go for the recommended approach of using IAM roles.
> The hive.s3.use-instance-credentials configuration parameter I got from [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which says:
>
> ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is based on code from the Presto project <https://prestodb.io/>. You can configure it using the same configuration keys as the Presto file system <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, by adding the configurations to your `flink-conf.yaml`. The Presto S3 implementation is the recommended file system for checkpointing to S3....
>
> Its possible I am misunderstanding it?
>
> Best,
> Jonas
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <ma...@ververica.com>:
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's described in [1]? I'm not sure about this hive.s3.use-instance-credentials being a valid configuration parameter.
>>
>> Best,
>> Matthias
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>>
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>>> Hey,
>>>
>>> I am setting up HA on a standalone Kubernetes Flink application job cluster.
>>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>>
>>> * The JobManager shortly fails after starts with the following errors (apologies in advance for the length), and I can't understand what's going on.
>>> * First I thought it may be due to missing Delete privileges of the IAM role and updated that, but the problem persists.
>>> * The S3 bucket configured s3://<company>/recovery is empty.
>>>
>>> configmap.yaml
>>> flink-conf.yaml: |+
>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.numberOfTaskSlots: 2
>>> taskmanager.rpc.port: 6122
>>> taskmanager.memory.process.size: 1728m
>>> blob.server.port: 6124
>>> queryable-state.proxy.ports: 6125
>>> parallelism.default: 2
>>> scheduler-mode: reactive
>>> execution.checkpointing.interval: 10s
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>> high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.cluster-id: {{ $fullName }}
>>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment }}/recovery
>>> hive.s3.use-instance-credentials: true
>>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for running the jobmanager and taskmanager pods
>>>
>>> role.yaml
>>> kind: Role
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> metadata:
>>> name: {{ $fullName }}
>>> namespace: {{ $fullName }}
>>> labels:
>>> app: {{ $appName }}
>>> chart: {{ template "thoros.chart" . }}
>>> release: {{ .Release.Name }}
>>> heritage: {{ .Release.Service }}
>>>
>>> rules:
>>> - apiGroups: [""]
>>> resources: ["configmaps"]
>>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>>
>>> aws IAM policy
>>> {
>>> "Version": "2012-10-17",
>>> "Statement": [
>>> {
>>> "Action": [
>>> "s3:ListBucket",
>>> "s3:Get*",
>>> "s3:Put*",
>>> "s3:Delete*"
>>> ],
>>> "Resource": [
>>> "arn:aws:s3:::<company>-flink-dev/*"
>>> ],
>>> "Effect": "Allow"
>>> }
>>> ]
>>> }
>>>
>>> *Error-log:*
>>> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner [] - Executing pipeline using FlinkRunner.
>>> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner [] - For maximum performance you should set the 'fasterCopy' option. See more at https://issues.apache.org/jira/browse/BEAM-11146
>>> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner [] - Translating pipeline to Flink program.
>>> 2021-08-26 13:08:43,456 INFO org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found unbounded PCollection. Switching to streaming execution.
>>> 2021-08-26 13:08:43,461 INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating a Streaming Environment.
>>> 2021-08-26 13:08:43,462 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, thoros-jobmanager
>>> 2021-08-26 13:08:43,462 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
>>> 2021-08-26 13:08:43,462 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
>>> 2021-08-26 13:08:43,463 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2
>>> 2021-08-26 13:08:43,463 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.rpc.port, 6122
>>> 2021-08-26 13:08:43,463 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
>>> 2021-08-26 13:08:43,463 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: blob.server.port, 6124
>>> 2021-08-26 13:08:43,464 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: queryable-state.proxy.ports, 6125
>>> 2021-08-26 13:08:43,464 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 2
>>> 2021-08-26 13:08:43,465 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: scheduler-mode, reactive
>>> 2021-08-26 13:08:43,465 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.checkpointing.interval, 10s
>>> 2021-08-26 13:08:43,466 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: restart-strategy, fixed-delay
>>> 2021-08-26 13:08:43,466 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: restart-strategy.fixed-delay.attempts, 10
>>> 2021-08-26 13:08:43,466 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability, org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> 2021-08-26 13:08:43,467 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.cluster-id, thoros
>>> 2021-08-26 13:08:43,467 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.storageDir, s3://<company>-flink-dev/recovery
>>> 2021-08-26 13:08:43,468 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: hive.s3.use-instance-credentials, true
>>> 2021-08-26 13:08:43,468 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.namespace, thoros
>>> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner [] - Starting execution of Flink program.
>>> 2021-08-26 13:08:45,444 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted.
>>> 2021-08-26 13:08:45,454 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000.
>>> 2021-08-26 13:08:45,486 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 00000000000000000000000000000000 (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:45,498 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 00000000000000000000000000000000 (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:46,152 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>>> 2021-08-26 13:08:46,169 INFO org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,213 INFO org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,231 WARN org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed to delete blob at s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>>> 2021-08-26 13:08:46,239 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to submit job 00000000000000000000000000000000.
>>> java.lang.RuntimeException: java.lang.Exception: Could not open output stream for state backend
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_302]
>>> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_302]
>>> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_302]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> Caused by: java.lang.Exception: Could not open output stream for state backend
>>> at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: V0BWCA4RDVE0EVK8; S3 Extended Request ID: yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=; Proxy: null), S3 Extended Request ID: yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o= (Path: s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573) ~[?:?]
>>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>>> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>>> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154) ~[?:?]
>>> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[?:?]
>>> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> --
>>> *Med Vänliga Hälsningar*
>>> *Jonas Eyob*
>
>
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Posted by jonas eyob <jo...@gmail.com>.
Hey Matthias,
Yes, I have followed the documentation on the link you provided - and
decided to go for the recommended approach of using IAM roles.
The hive.s3.use-instance-credentials configuration parameter I got from [1]
(first bullet) since I am using the flink-s3-fs-presto plugin - which says:
..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*, is
based on code from the Presto project <https://prestodb.io/>. You can
configure it using the same configuration keys as the Presto file system
<https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
by adding the configurations to your flink-conf.yaml. The Presto S3
implementation is the recommended file system for checkpointing to S3....
Its possible I am misunderstanding it?
Best,
Jonas
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <ma...@ververica.com>:
> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3://<company>/recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::<company>-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner
>> [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
>> unbounded PCollection. Switching to streaming execution.
>> 2021-08-26 13:08:43,461 INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
>> a Streaming Environment.
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2021-08-26 13:08:43,462 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: jobmanager.memory.process.size, 1600m
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.numberOfTaskSlots, 2
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.rpc.port, 6122
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: taskmanager.memory.process.size, 1728m
>> 2021-08-26 13:08:43,463 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: blob.server.port, 6124
>> 2021-08-26 13:08:43,464 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: queryable-state.proxy.ports, 6125
>> 2021-08-26 13:08:43,464 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: parallelism.default, 2
>> 2021-08-26 13:08:43,465 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: scheduler-mode, reactive
>> 2021-08-26 13:08:43,465 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: execution.checkpointing.interval, 10s
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: restart-strategy, fixed-delay
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: restart-strategy.fixed-delay.attempts, 10
>> 2021-08-26 13:08:43,466 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: high-availability,
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 2021-08-26 13:08:43,467 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: kubernetes.cluster-id, thoros
>> 2021-08-26 13:08:43,467 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: high-availability.storageDir,
>> s3://<company>-flink-dev/recovery
>> 2021-08-26 13:08:43,468 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: hive.s3.use-instance-credentials, true
>> 2021-08-26 13:08:43,468 INFO
>> org.apache.flink.configuration.GlobalConfiguration [] - Loading
>> configuration property: kubernetes.namespace, thoros
>> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner
>> [] - Starting execution of Flink program.
>> 2021-08-26 13:08:45,444 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 00000000000000000000000000000000 is submitted.
>> 2021-08-26 13:08:45,454 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=00000000000000000000000000000000.
>> 2021-08-26 13:08:45,486 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
>> JobGraph submission 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:45,498 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>> Submitting job 00000000000000000000000000000000
>> (main0-flink-0826130845-6f3e805f).
>> 2021-08-26 13:08:46,152 INFO
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
>> job graph 00000000000000000000000000000000 from
>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>> 2021-08-26 13:08:46,169 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,213 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Finished cleaning up the high availability data for job
>> 00000000000000000000000000000000.
>> 2021-08-26 13:08:46,231 WARN
>> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed
>> to delete blob at
>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>> 2021-08-26 13:08:46,239 ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to
>> submit job 00000000000000000000000000000000.
>> java.lang.RuntimeException: java.lang.Exception: Could not open output
>> stream for state backend
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
>> ~[?:1.8.0_302]
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_302]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> Caused by: java.lang.Exception: Could not open output stream for state
>> backend
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> Caused by:
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
>> Proxy: null), S3 Extended Request ID:
>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
>> (Path:
>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>> ~[?:?]
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>> ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
>> ~[?:?]
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>> ~[?:?]
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>> ... 27 more
>> --
>> *Med Vänliga Hälsningar*
>> *Jonas Eyob*
>>
>
--
*Med Vänliga Hälsningar*
*Jonas Eyob*
Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Posted by Matthias Pohl <ma...@ververica.com>.
Hi Jonas,
have you included the s3 credentials in the Flink config file like it's
described in [1]? I'm not sure about this hive.s3.use-instance-credentials
being a valid configuration parameter.
Best,
Matthias
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jo...@gmail.com> wrote:
> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::<company>-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner
> [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
> org.apache.flink.configuration.GlobalConfiguration [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner
> [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>