You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Toy <no...@gmail.com> on 2018/01/22 16:19:05 UTC

How do I extract a value in foreachRDD operation

Hi,

We have a spark application to parse log files and save to S3 in ORC
format. However, during the foreachRDD operation we need to extract a date
field to be able to determine the bucket location; we partition it by date.
Currently, we just hardcode it by current date, but we have a requirement
to determine it for each record.

Here's the current code.

    jsonRows.foreachRDD(r => {
      val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
      val parsedDate = parsedFormat.format(new java.util.Date())
      val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
parsedDate

      val jsonDf = sqlSession.read.schema(Schema.schema).json(r)

      val writer =
jsonDf.write.mode("append").format("orc").option("compression", "zlib")

      if (environment.equals("local")) {
        writer.save("/tmp/sparrow")
      } else {
        writer.save(OutputPath)
      }
    })

The column in jsonRow that we want is `_ts`.

Thanks.

Re: [EXT] How do I extract a value in foreachRDD operation

Posted by Toy <no...@gmail.com>.
Thanks Michael,

Can you give me an example? I'm new to Spark

On Mon, 22 Jan 2018 at 12:25 Michael Mansour <Mi...@symantec.com>
wrote:

> Toy,
>
>
>
> I suggest your partition your data according to date, and use the
> forEachPartition function, using the partition as the bucket location.
>
> This would require you to define a custom hash partitioner function, but
> that is not too difficult.
>
>
>
> --
>
> Michael Mansour
>
> Data Scientist
>
> Symantec
>
> *From: *Toy <no...@gmail.com>
> *Date: *Monday, January 22, 2018 at 8:19 AM
> *To: *"user@spark.apache.org" <us...@spark.apache.org>
> *Subject: *[EXT] How do I extract a value in foreachRDD operation
>
>
>
> Hi,
>
>
>
> We have a spark application to parse log files and save to S3 in ORC
> format. However, during the foreachRDD operation we need to extract a date
> field to be able to determine the bucket location; we partition it by date.
> Currently, we just hardcode it by current date, but we have a requirement
> to determine it for each record.
>
>
>
> Here's the current code.
>
>
>
>     jsonRows.foreachRDD(r => {
>
>       val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
>
>       val parsedDate = parsedFormat.format(new java.util.Date())
>
>       val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
> parsedDate
>
>
>
>       val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
>
>
>
>       val writer =
> jsonDf.write.mode("append").format("orc").option("compression", "zlib")
>
>
>
>       if (environment.equals("local")) {
>
>         writer.save("/tmp/sparrow")
>
>       } else {
>
>         writer.save(OutputPath)
>
>       }
>
>     })
>
>
>
> The column in jsonRow that we want is `_ts`.
>
>
>
> Thanks.
>

Re: [EXT] How do I extract a value in foreachRDD operation

Posted by Michael Mansour <Mi...@symantec.com>.
Toy,

I suggest your partition your data according to date, and use the forEachPartition function, using the partition as the bucket location.
This would require you to define a custom hash partitioner function, but that is not too difficult.

--
Michael Mansour
Data Scientist
Symantec
From: Toy <no...@gmail.com>
Date: Monday, January 22, 2018 at 8:19 AM
To: "user@spark.apache.org" <us...@spark.apache.org>
Subject: [EXT] How do I extract a value in foreachRDD operation

Hi,

We have a spark application to parse log files and save to S3 in ORC format. However, during the foreachRDD operation we need to extract a date field to be able to determine the bucket location; we partition it by date. Currently, we just hardcode it by current date, but we have a requirement to determine it for each record.

Here's the current code.

    jsonRows.foreachRDD(r => {
      val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
      val parsedDate = parsedFormat.format(new java.util.Date())
      val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" + parsedDate

      val jsonDf = sqlSession.read.schema(Schema.schema).json(r)

      val writer = jsonDf.write.mode("append").format("orc").option("compression", "zlib")

      if (environment.equals("local")) {
        writer.save("/tmp/sparrow")
      } else {
        writer.save(OutputPath)
      }
    })

The column in jsonRow that we want is `_ts`.

Thanks.