You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Mansour <Mi...@symantec.com> on 2018/01/22 17:25:41 UTC
Re: [EXT] How do I extract a value in foreachRDD operation
Toy,
I suggest your partition your data according to date, and use the forEachPartition function, using the partition as the bucket location.
This would require you to define a custom hash partitioner function, but that is not too difficult.
--
Michael Mansour
Data Scientist
Symantec
From: Toy <no...@gmail.com>
Date: Monday, January 22, 2018 at 8:19 AM
To: "user@spark.apache.org" <us...@spark.apache.org>
Subject: [EXT] How do I extract a value in foreachRDD operation
Hi,
We have a spark application to parse log files and save to S3 in ORC format. However, during the foreachRDD operation we need to extract a date field to be able to determine the bucket location; we partition it by date. Currently, we just hardcode it by current date, but we have a requirement to determine it for each record.
Here's the current code.
jsonRows.foreachRDD(r => {
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" + parsedDate
val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
val writer = jsonDf.write.mode("append").format("orc").option("compression", "zlib")
if (environment.equals("local")) {
writer.save("/tmp/sparrow")
} else {
writer.save(OutputPath)
}
})
The column in jsonRow that we want is `_ts`.
Thanks.
Re: [EXT] How do I extract a value in foreachRDD operation
Posted by Toy <no...@gmail.com>.
Thanks Michael,
Can you give me an example? I'm new to Spark
On Mon, 22 Jan 2018 at 12:25 Michael Mansour <Mi...@symantec.com>
wrote:
> Toy,
>
>
>
> I suggest your partition your data according to date, and use the
> forEachPartition function, using the partition as the bucket location.
>
> This would require you to define a custom hash partitioner function, but
> that is not too difficult.
>
>
>
> --
>
> Michael Mansour
>
> Data Scientist
>
> Symantec
>
> *From: *Toy <no...@gmail.com>
> *Date: *Monday, January 22, 2018 at 8:19 AM
> *To: *"user@spark.apache.org" <us...@spark.apache.org>
> *Subject: *[EXT] How do I extract a value in foreachRDD operation
>
>
>
> Hi,
>
>
>
> We have a spark application to parse log files and save to S3 in ORC
> format. However, during the foreachRDD operation we need to extract a date
> field to be able to determine the bucket location; we partition it by date.
> Currently, we just hardcode it by current date, but we have a requirement
> to determine it for each record.
>
>
>
> Here's the current code.
>
>
>
> jsonRows.foreachRDD(r => {
>
> val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
>
> val parsedDate = parsedFormat.format(new java.util.Date())
>
> val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
> parsedDate
>
>
>
> val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
>
>
>
> val writer =
> jsonDf.write.mode("append").format("orc").option("compression", "zlib")
>
>
>
> if (environment.equals("local")) {
>
> writer.save("/tmp/sparrow")
>
> } else {
>
> writer.save(OutputPath)
>
> }
>
> })
>
>
>
> The column in jsonRow that we want is `_ts`.
>
>
>
> Thanks.
>