You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "kursat kursatkurt.com" <ku...@kursatkurt.com> on 2022/11/18 19:13:09 UTC

Could not connect Amazon S3 file system with different creds at runtime.

Hi;



I am new to Flink development.

Is there any way to set S3 credentials at runtime?

How can we connect 3 or more different s3 buckets (with different creds)?

Lets say you have 3 csv file on AWS S3, and you want to join them with their id fields.



How can we do this? I don't want to use flink-conf.yaml file or another config file.

Because sources can change dynamically, so I need to set creds dynamically.



I could not pass the creds checking for even 1 csv file, here you can try the code(Scala):



object AwsS3CSVTest {

  def main(args: Array[String]): Unit = {

    val conf = new Configuration();

    conf.setString("fs.s3a.access.key", "***")

    conf.setString("fs.s3a.secret.key", "***")

    val env = ExecutionEnvironment.createLocalEnvironment(conf)

    val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")

      .ignoreFirstLine()

      .fieldDelimiter(";")

      .types(classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String])

    datafile.print()

  }

}



I also asked on Stackoverflow for sharing.



https://stackoverflow.com/questions/74482619/apache-flink-s3-file-system-credentials-does-not-work/



I want to say that, I know I can do this with Spark. You can access the HadoopConfiguration and set the creds at runtime:



  def getAwsS3DF = {

    val ss = SparkFactory.getSparkSession

    ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "xxx")

    ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "xxx")



    val df = ss.read.format("csv")

      .option("header", true)

      .option("sep", "\t")

      .load("s3a://anybucket/anyfile.csv ")

   df.show

  }



So is there anything am I missing or is it not possible?



Thank you.


RE: Could not connect Amazon S3 file system with different creds at runtime.

Posted by "kursat kursatkurt.com" <ku...@kursatkurt.com>.
Thank you Martijn, I am watching the issue.

Regards,
Kursat

-----Original Message-----
From: Martijn Visser <ma...@apache.org> 
Sent: Saturday, November 19, 2022 01:01
To: dev@flink.apache.org
Subject: Re: Could not connect Amazon S3 file system with different creds at runtime.

Hi,

Like I mentioned in Stackoverflow, it's currently not possible. See
https://issues.apache.org/jira/browse/FLINK-19589

Best regards,

Martijn

On Fri, Nov 18, 2022 at 8:13 PM kursat kursatkurt.com <ku...@kursatkurt.com>
wrote:

> Hi;
>
>
>
> I am new to Flink development.
>
> Is there any way to set S3 credentials at runtime?
>
> How can we connect 3 or more different s3 buckets (with different creds)?
>
> Lets say you have 3 csv file on AWS S3, and you want to join them with 
> their id fields.
>
>
>
> How can we do this? I don't want to use flink-conf.yaml file or 
> another config file.
>
> Because sources can change dynamically, so I need to set creds dynamically.
>
>
>
> I could not pass the creds checking for even 1 csv file, here you can 
> try the code(Scala):
>
>
>
> object AwsS3CSVTest {
>
>   def main(args: Array[String]): Unit = {
>
>     val conf = new Configuration();
>
>     conf.setString("fs.s3a.access.key", "***")
>
>     conf.setString("fs.s3a.secret.key", "***")
>
>     val env = ExecutionEnvironment.createLocalEnvironment(conf)
>
>     val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")
>
>       .ignoreFirstLine()
>
>       .fieldDelimiter(";")
>
>       .types(classOf[String], classOf[String], classOf[String], 
> classOf[String], classOf[String], classOf[String])
>
>     datafile.print()
>
>   }
>
> }
>
>
>
> I also asked on Stackoverflow for sharing.
>
>
>
>
> https://stackoverflow.com/questions/74482619/apache-flink-s3-file-syst
> em-credentials-does-not-work/
>
>
>
> I want to say that, I know I can do this with Spark. You can access 
> the HadoopConfiguration and set the creds at runtime:
>
>
>
>   def getAwsS3DF = {
>
>     val ss = SparkFactory.getSparkSession
>
>     ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", 
> "xxx")
>
>     ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", 
> "xxx")
>
>
>
>     val df = ss.read.format("csv")
>
>       .option("header", true)
>
>       .option("sep", "\t")
>
>       .load("s3a://anybucket/anyfile.csv ")
>
>    df.show
>
>   }
>
>
>
> So is there anything am I missing or is it not possible?
>
>
>
> Thank you.
>
>

Re: Could not connect Amazon S3 file system with different creds at runtime.

Posted by Martijn Visser <ma...@apache.org>.
Hi,

Like I mentioned in Stackoverflow, it's currently not possible. See
https://issues.apache.org/jira/browse/FLINK-19589

Best regards,

Martijn

On Fri, Nov 18, 2022 at 8:13 PM kursat kursatkurt.com <ku...@kursatkurt.com>
wrote:

> Hi;
>
>
>
> I am new to Flink development.
>
> Is there any way to set S3 credentials at runtime?
>
> How can we connect 3 or more different s3 buckets (with different creds)?
>
> Lets say you have 3 csv file on AWS S3, and you want to join them with
> their id fields.
>
>
>
> How can we do this? I don't want to use flink-conf.yaml file or another
> config file.
>
> Because sources can change dynamically, so I need to set creds dynamically.
>
>
>
> I could not pass the creds checking for even 1 csv file, here you can try
> the code(Scala):
>
>
>
> object AwsS3CSVTest {
>
>   def main(args: Array[String]): Unit = {
>
>     val conf = new Configuration();
>
>     conf.setString("fs.s3a.access.key", "***")
>
>     conf.setString("fs.s3a.secret.key", "***")
>
>     val env = ExecutionEnvironment.createLocalEnvironment(conf)
>
>     val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")
>
>       .ignoreFirstLine()
>
>       .fieldDelimiter(";")
>
>       .types(classOf[String], classOf[String], classOf[String],
> classOf[String], classOf[String], classOf[String])
>
>     datafile.print()
>
>   }
>
> }
>
>
>
> I also asked on Stackoverflow for sharing.
>
>
>
>
> https://stackoverflow.com/questions/74482619/apache-flink-s3-file-system-credentials-does-not-work/
>
>
>
> I want to say that, I know I can do this with Spark. You can access the
> HadoopConfiguration and set the creds at runtime:
>
>
>
>   def getAwsS3DF = {
>
>     val ss = SparkFactory.getSparkSession
>
>     ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "xxx")
>
>     ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "xxx")
>
>
>
>     val df = ss.read.format("csv")
>
>       .option("header", true)
>
>       .option("sep", "\t")
>
>       .load("s3a://anybucket/anyfile.csv ")
>
>    df.show
>
>   }
>
>
>
> So is there anything am I missing or is it not possible?
>
>
>
> Thank you.
>
>