You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "kursat kursatkurt.com" <ku...@kursatkurt.com> on 2022/11/18 19:13:09 UTC
Could not connect Amazon S3 file system with different creds at runtime.
Hi;
I am new to Flink development.
Is there any way to set S3 credentials at runtime?
How can we connect 3 or more different s3 buckets (with different creds)?
Lets say you have 3 csv file on AWS S3, and you want to join them with their id fields.
How can we do this? I don't want to use flink-conf.yaml file or another config file.
Because sources can change dynamically, so I need to set creds dynamically.
I could not pass the creds checking for even 1 csv file, here you can try the code(Scala):
object AwsS3CSVTest {
def main(args: Array[String]): Unit = {
val conf = new Configuration();
conf.setString("fs.s3a.access.key", "***")
conf.setString("fs.s3a.secret.key", "***")
val env = ExecutionEnvironment.createLocalEnvironment(conf)
val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")
.ignoreFirstLine()
.fieldDelimiter(";")
.types(classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String])
datafile.print()
}
}
I also asked on Stackoverflow for sharing.
https://stackoverflow.com/questions/74482619/apache-flink-s3-file-system-credentials-does-not-work/
I want to say that, I know I can do this with Spark. You can access the HadoopConfiguration and set the creds at runtime:
def getAwsS3DF = {
val ss = SparkFactory.getSparkSession
ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "xxx")
ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "xxx")
val df = ss.read.format("csv")
.option("header", true)
.option("sep", "\t")
.load("s3a://anybucket/anyfile.csv ")
df.show
}
So is there anything am I missing or is it not possible?
Thank you.
RE: Could not connect Amazon S3 file system with different creds at runtime.
Posted by "kursat kursatkurt.com" <ku...@kursatkurt.com>.
Thank you Martijn, I am watching the issue.
Regards,
Kursat
-----Original Message-----
From: Martijn Visser <ma...@apache.org>
Sent: Saturday, November 19, 2022 01:01
To: dev@flink.apache.org
Subject: Re: Could not connect Amazon S3 file system with different creds at runtime.
Hi,
Like I mentioned in Stackoverflow, it's currently not possible. See
https://issues.apache.org/jira/browse/FLINK-19589
Best regards,
Martijn
On Fri, Nov 18, 2022 at 8:13 PM kursat kursatkurt.com <ku...@kursatkurt.com>
wrote:
> Hi;
>
>
>
> I am new to Flink development.
>
> Is there any way to set S3 credentials at runtime?
>
> How can we connect 3 or more different s3 buckets (with different creds)?
>
> Lets say you have 3 csv file on AWS S3, and you want to join them with
> their id fields.
>
>
>
> How can we do this? I don't want to use flink-conf.yaml file or
> another config file.
>
> Because sources can change dynamically, so I need to set creds dynamically.
>
>
>
> I could not pass the creds checking for even 1 csv file, here you can
> try the code(Scala):
>
>
>
> object AwsS3CSVTest {
>
> def main(args: Array[String]): Unit = {
>
> val conf = new Configuration();
>
> conf.setString("fs.s3a.access.key", "***")
>
> conf.setString("fs.s3a.secret.key", "***")
>
> val env = ExecutionEnvironment.createLocalEnvironment(conf)
>
> val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")
>
> .ignoreFirstLine()
>
> .fieldDelimiter(";")
>
> .types(classOf[String], classOf[String], classOf[String],
> classOf[String], classOf[String], classOf[String])
>
> datafile.print()
>
> }
>
> }
>
>
>
> I also asked on Stackoverflow for sharing.
>
>
>
>
> https://stackoverflow.com/questions/74482619/apache-flink-s3-file-syst
> em-credentials-does-not-work/
>
>
>
> I want to say that, I know I can do this with Spark. You can access
> the HadoopConfiguration and set the creds at runtime:
>
>
>
> def getAwsS3DF = {
>
> val ss = SparkFactory.getSparkSession
>
> ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",
> "xxx")
>
> ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",
> "xxx")
>
>
>
> val df = ss.read.format("csv")
>
> .option("header", true)
>
> .option("sep", "\t")
>
> .load("s3a://anybucket/anyfile.csv ")
>
> df.show
>
> }
>
>
>
> So is there anything am I missing or is it not possible?
>
>
>
> Thank you.
>
>
Re: Could not connect Amazon S3 file system with different creds at runtime.
Posted by Martijn Visser <ma...@apache.org>.
Hi,
Like I mentioned in Stackoverflow, it's currently not possible. See
https://issues.apache.org/jira/browse/FLINK-19589
Best regards,
Martijn
On Fri, Nov 18, 2022 at 8:13 PM kursat kursatkurt.com <ku...@kursatkurt.com>
wrote:
> Hi;
>
>
>
> I am new to Flink development.
>
> Is there any way to set S3 credentials at runtime?
>
> How can we connect 3 or more different s3 buckets (with different creds)?
>
> Lets say you have 3 csv file on AWS S3, and you want to join them with
> their id fields.
>
>
>
> How can we do this? I don't want to use flink-conf.yaml file or another
> config file.
>
> Because sources can change dynamically, so I need to set creds dynamically.
>
>
>
> I could not pass the creds checking for even 1 csv file, here you can try
> the code(Scala):
>
>
>
> object AwsS3CSVTest {
>
> def main(args: Array[String]): Unit = {
>
> val conf = new Configuration();
>
> conf.setString("fs.s3a.access.key", "***")
>
> conf.setString("fs.s3a.secret.key", "***")
>
> val env = ExecutionEnvironment.createLocalEnvironment(conf)
>
> val datafile = env.readCsvFile("s3a://anybucket/anyfile.csv")
>
> .ignoreFirstLine()
>
> .fieldDelimiter(";")
>
> .types(classOf[String], classOf[String], classOf[String],
> classOf[String], classOf[String], classOf[String])
>
> datafile.print()
>
> }
>
> }
>
>
>
> I also asked on Stackoverflow for sharing.
>
>
>
>
> https://stackoverflow.com/questions/74482619/apache-flink-s3-file-system-credentials-does-not-work/
>
>
>
> I want to say that, I know I can do this with Spark. You can access the
> HadoopConfiguration and set the creds at runtime:
>
>
>
> def getAwsS3DF = {
>
> val ss = SparkFactory.getSparkSession
>
> ss.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "xxx")
>
> ss.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "xxx")
>
>
>
> val df = ss.read.format("csv")
>
> .option("header", true)
>
> .option("sep", "\t")
>
> .load("s3a://anybucket/anyfile.csv ")
>
> df.show
>
> }
>
>
>
> So is there anything am I missing or is it not possible?
>
>
>
> Thank you.
>
>