You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Lichuan Shang (Jira)" <ji...@apache.org> on 2022/08/16 04:50:00 UTC
[jira] [Created] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3
Lichuan Shang created FLINK-28983:
-------------------------------------
Summary: using serviceaccount in FlinkDeployment not works when sink to aws s3
Key: FLINK-28983
URL: https://issues.apache.org/jira/browse/FLINK-28983
Project: Flink
Issue Type: Bug
Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Lichuan Shang
I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).]
The `flink` service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log:
```
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:322)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBucketWriter(StreamingFileSink.java:428)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:438)
at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:96)
at org.apache.flink.connector.file.table.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:81)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1438)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1374)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:392)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 25 more
```
I try to use AKSK(which is not recommended) to see if I am lucky. It occurs to me the k8s pod is in running state after setting `s3.access-key` and `s3.secret-key`.
Here is my config file:
```yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: sql-example-stateful-s3
namespace: flink
spec:
image: 11122233344455.dkr.ecr.cn-northwest-1.amazonaws.com.cn/flink/flink-sql-runner-example:latest
imagePullPolicy: Always
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
state.savepoints.dir: s3://bucket/flink/flink-data/savepoints
state.checkpoints.dir: s3://bucket/flink/flink-data/checkpoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://bucket/flink/flink-data/ha
execution.checkpointing.interval: "10000"
state.backend: filesystem
fs.s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
env.java.opts: -Dcom.amazonaws.services.s3.enableV4
s3.access-key: <AWS_ACCESS_KEY>
s3.secret-key: <AWS_SECRET_ACCESS_KEY>
s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/usrlib/sql-runner.jar
args: ["/opt/flink/usrlib/sql-scripts/orders.sql"]
parallelism: 1
upgradeMode: last-state
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)