You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Lichuan Shang (Jira)" <ji...@apache.org> on 2022/08/16 04:50:00 UTC

[jira] [Created] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3

Lichuan Shang created FLINK-28983:
-------------------------------------

             Summary: using serviceaccount in FlinkDeployment not works when sink to aws s3
                 Key: FLINK-28983
                 URL: https://issues.apache.org/jira/browse/FLINK-28983
             Project: Flink
          Issue Type: Bug
          Components: Kubernetes Operator
    Affects Versions: kubernetes-operator-1.1.0
            Reporter: Lichuan Shang


I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).] 

The `flink` service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log:

```

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
        at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
        at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:322)
        at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBucketWriter(StreamingFileSink.java:428)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:438)
        at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:96)
        at org.apache.flink.connector.file.table.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:81)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
        at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1438)
        at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1374)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:392)
        at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
        ... 25 more

```

 

I try to use AKSK(which is not recommended) to see if I am lucky. It occurs to me the k8s pod is in running state after setting `s3.access-key` and `s3.secret-key`. 

 

Here is my config file:

 

```yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: sql-example-stateful-s3
  namespace: flink
spec:
  image: 11122233344455.dkr.ecr.cn-northwest-1.amazonaws.com.cn/flink/flink-sql-runner-example:latest
  imagePullPolicy: Always
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://bucket/flink/flink-data/savepoints
    state.checkpoints.dir: s3://bucket/flink/flink-data/checkpoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://bucket/flink/flink-data/ha
    execution.checkpointing.interval: "10000"
    state.backend: filesystem
    fs.s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
    env.java.opts: -Dcom.amazonaws.services.s3.enableV4
    s3.access-key: <AWS_ACCESS_KEY>
    s3.secret-key: <AWS_SECRET_ACCESS_KEY>
    s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/usrlib/sql-runner.jar
    args: ["/opt/flink/usrlib/sql-scripts/orders.sql"]
    parallelism: 1
    upgradeMode: last-state

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)