You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Aniruddha P Tekade <at...@binghamton.edu> on 2019/10/29 22:43:24 UTC

[DISCUSS] writing structured streaming dataframe to custom S3 buckets?

Hello,

I have a local S3 service that is writable and readable using AWS sdk APIs.
I created the spark session and then set the hadoop configurations as
follows -

// Create Spark Session
val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("S3Loaders")
  .config("spark.sql.streaming.checkpointLocation",
"/Users/atekade/checkpoint-s3-loaders/")
  .getOrCreate()

// Take spark context from spark session
val sc = spark.sparkContext

// Configure spark context with S3 values
val accessKey = "00cce9eb2c589b1b1b5b"
val secretKey = "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8"
val endpoint = "http://s3-region1.mycloudianhyperstore.com:80"

spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
//    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
//    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)

sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)

And then trying to write to the s3 as follows -

val query = rawData
  .writeStream
  .format("csv")
  .option("format", "append")
  .option("path", "s3a://bucket0/")
  .outputMode("append")
  .start()

But nothing is actually getting written. Since I am running this from my
local machine, I have an entry for the ip-address and S3 endpoint into the
/etc/hosts file. As you can see this is a streaming dataframe and so can
not write without writeStream API. Can someone help about what am I missing
here? Is there any better way to perform this?

Best,
Aniruddha
-----------

ᐧ

Re: [DISCUSS] writing structured streaming dataframe to custom S3 buckets?

Posted by Steve Loughran <st...@cloudera.com.INVALID>.
> spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")

This is some superstition which seems to get carried through stack overflow
articles. You do not need to declare the implementation class for s3a://
any more than you have to do for HDFS. It's defined in core-defaults.xml in
hadoop-common. Remove

fs.s3a.awsAccessKeyId" -not the correct config name

S3 is an object store and uploads are not manifest until the put is
complete, which happens in close(). Is that what you're seeing here?

Otherwise, set the org.apache.hadoop.fs.s3a log level to DEBUG and see what
it says is going on.

HTH

On Tue, Oct 29, 2019 at 10:43 PM Aniruddha P Tekade <at...@binghamton.edu>
wrote:

> Hello,
>
> I have a local S3 service that is writable and readable using AWS sdk
> APIs. I created the spark session and then set the hadoop configurations as
> follows -
>
> // Create Spark Session
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("S3Loaders")
>   .config("spark.sql.streaming.checkpointLocation", "/Users/atekade/checkpoint-s3-loaders/")
>   .getOrCreate()
>
> // Take spark context from spark session
> val sc = spark.sparkContext
>
> // Configure spark context with S3 values
> val accessKey = "00cce9eb2c589b1b1b5b"
> val secretKey = "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8"
> val endpoint = "http://s3-region1.mycloudianhyperstore.com:80"
>
> spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
> //    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
> //    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
>
> sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
>
> And then trying to write to the s3 as follows -
>
> val query = rawData
>   .writeStream
>   .format("csv")
>   .option("format", "append")
>   .option("path", "s3a://bucket0/")
>   .outputMode("append")
>   .start()
>
> But nothing is actually getting written. Since I am running this from my
> local machine, I have an entry for the ip-address and S3 endpoint into the
> /etc/hosts file. As you can see this is a streaming dataframe and so can
> not write without writeStream API. Can someone help about what am I missing
> here? Is there any better way to perform this?
>
> Best,
> Aniruddha
> -----------
>
> ᐧ
>