You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sambhav gupta <co...@gmail.com> on 2022/05/24 10:02:35 UTC

Flink HUDI Sink - MinIO S3 access Issue

Hi Flink User Group,

I am a Data Engineer from Thoughtworks and am facing an issue while
writing hudi tables to the S3 protocol Compliant MinIO Bucket.

*I used Flink Sql-client.sh and added flink-s3-fs-hadoop-1.13.6.jar
and hudi-flink-bundle_2.11-0.10.1.jar to lib folder.*

*I am using Flink 1.13.6.*

I had tried writing a simple CSV table on the MinIO which I was able to do
by adding s3 access key and secret key properties in flink-conf.yaml.

But when I tried to create a Hudi table pointing to MinIO bucket I faced an
Authorization  issue even after configuring s3 properties in
/etc/hadoop/cire-site.xml and setting the hadoop classpath.

*create table t1_s3_hudi(id int PRIMARY KEY, name varchar(50)) with
('connector' = 'hudi', 'path' = 's3://test2/t1_s3_hudi', 'table.type' =
'MERGE_ON_READ');*

On insertion Records
*Flink SQL> insert into t1_s3_hudi values(1,'one number s3');*

I get this error






































*Caused by: java.nio.file.AccessDeniedException:
s3://test2/t1_s3_hudi/.hoodie: getFileStatus on
s3://test2/t1_s3_hudi/.hoodie:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
JCM1JYBS0DD2GFQH; S3 Extended Request ID:
O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=;
Proxy: null), S3 Extended Request ID:
O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=:403
Forbidden        at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2970)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
org.apache.hudi.util.StreamerUtil.tableExists(StreamerUtil.java:290)
~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:258)
~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:164)
~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.13.6.jar:1.13.6]        ... 12 moreCaused by:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
JCM1JYBS0DD2GFQH; S3 Extended Request ID:
O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=;
Proxy: null)        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]*

I have attached the screenshots of(core-site.xml,  flink-sqlclient.sh
create hudi table command, flink-conf.yaml   that I got as error.
[image: Screenshot 2022-05-24 at 3.30.15 PM.png]
[image: Screenshot 2022-05-24 at 11.32.40 AM.png][image: Screenshot
2022-05-24 at 3.25.01 PM.png]
[image: Screenshot 2022-05-24 at 3.25.01 PM.png]


Can you help  us understand what the issue is? Since I started flink sql
client.sh with hud-flink bundle and changed the policy of s3 bucket to
read/write to all as well. Don't know what is going wrong. Any  help would
be really appreciated.

Thanks,
Sambhav  Gupta
(Data Engineer- -Thoughtworks)

Re: Flink HUDI Sink - MinIO S3 access Issue

Posted by sambhav gupta <co...@gmail.com>.
[image: Screenshot 2022-05-24 at 3.28.35 PM.png]

On Tue, May 24, 2022 at 3:32 PM sambhav gupta <co...@gmail.com>
wrote:

> Hi Flink User Group,
>
> I am a Data Engineer from Thoughtworks and am facing an issue while
> writing hudi tables to the S3 protocol Compliant MinIO Bucket.
>
> *I used Flink Sql-client.sh and added flink-s3-fs-hadoop-1.13.6.jar
> and hudi-flink-bundle_2.11-0.10.1.jar to lib folder.*
>
> *I am using Flink 1.13.6.*
>
> I had tried writing a simple CSV table on the MinIO which I was able to do
> by adding s3 access key and secret key properties in flink-conf.yaml.
>
> But when I tried to create a Hudi table pointing to MinIO bucket I faced
> an Authorization  issue even after configuring s3 properties in
> /etc/hadoop/cire-site.xml and setting the hadoop classpath.
>
> *create table t1_s3_hudi(id int PRIMARY KEY, name varchar(50)) with
> ('connector' = 'hudi', 'path' = 's3://test2/t1_s3_hudi', 'table.type' =
> 'MERGE_ON_READ');*
>
> On insertion Records
> *Flink SQL> insert into t1_s3_hudi values(1,'one number s3');*
>
> I get this error
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.nio.file.AccessDeniedException:
> s3://test2/t1_s3_hudi/.hoodie: getFileStatus on
> s3://test2/t1_s3_hudi/.hoodie:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> JCM1JYBS0DD2GFQH; S3 Extended Request ID:
> O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=;
> Proxy: null), S3 Extended Request ID:
> O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=:403
> Forbidden        at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2970)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> org.apache.hudi.util.StreamerUtil.tableExists(StreamerUtil.java:290)
> ~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
> org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:258)
> ~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
> org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:164)
> ~[hudi-flink-bundle_2.11-0.10.1.jar:0.10.1]        at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]        ... 12 moreCaused by:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> JCM1JYBS0DD2GFQH; S3 Extended Request ID:
> O601BKHby7rLa/ubYoywjKY7FVyqcknKf1IgKh31+84ApW7PvJ8j9fq53et8hA48T5WUouw2pQA=;
> Proxy: null)        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]        at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
> ~[flink-s3-fs-hadoop-1.13.6.jar:1.13.6]*
>
> I have attached the screenshots of(core-site.xml,  flink-sqlclient.sh
> create hudi table command, flink-conf.yaml   that I got as error.
> [image: Screenshot 2022-05-24 at 3.30.15 PM.png]
> [image: Screenshot 2022-05-24 at 11.32.40 AM.png][image: Screenshot
> 2022-05-24 at 3.25.01 PM.png]
> [image: Screenshot 2022-05-24 at 3.25.01 PM.png]
>
>
> Can you help  us understand what the issue is? Since I started flink sql
> client.sh with hud-flink bundle and changed the policy of s3 bucket to
> read/write to all as well. Don't know what is going wrong. Any  help would
> be really appreciated.
>
> Thanks,
> Sambhav  Gupta
> (Data Engineer- -Thoughtworks)
>