You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Paul Corley <pa...@ignitionone.com> on 2017/05/25 14:13:05 UTC

Structured Streaming from Parquet

I have a Spark Structured Streaming process that is implemented in 2 separate streaming apps.

First App reads .gz, which range in size from 1GB to 9GB compressed, files in from s3 filters out invalid records and repartitions the data and outputs to parquet on s3 partitioned the same as the stream is partitioned. This process produces thousands of files which other processes consume.  The thought on this approach was to:

1)       Break the file down to smaller more easily consumed sizes

2)       Allow a more parallelism in the processes that consume the data.

3)       Allow multiple downstream processes to consume data that has already

a.       Had bad records filtered out

b.       Not have to fully read in such large files

Second application reads in the files produced by the first app.  This process then reformats the data from a row that is:

12NDSIN|20170101:123313, 5467;20170115:987

into:
12NDSIN, 20170101, 123313
12NDSIN, 20170101, 5467
12NDSIN, 20170115, 987

App 1 runs no problems and churns through files in its source directory on s3.  Total process time for a file is < 10min.  App2 is the one having issues.

The source is defined as
val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize)
  .schema(rawSchema)
  .parquet(config.getString("aws.s3.sourcepath"))   <=====Line85

output is defined as
val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",config.getString("spark.app.checkpoint_dir") + "/bk")
  .option("path", config.getString("spark.app.s3.output"))
  .start()
  .awaitTermination()

If files exist from app 1 app 2 enters a cycle of just cycling through parquet at ProcessFromSource.scala:85<http://ip-10-205-68-107.ec2.internal:18080/history/application_1491337161441_4439/stages/stage?id=78&attempt=0>           3999/3999

If there are a few files output from app1 eventually it will enter the stage where it actually processes the data and begins to output, but the more files produced by app1 the longer it takes if it ever completes these steps.  With an extremely large number of files the app eventually throws a java OOM error. Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in this step and how to alleviate it



Thanks,

Paul Corley | Principle Data Engineer

Re: Structured Streaming from Parquet

Posted by upendra 1991 <up...@yahoo.com.INVALID>.
Paul,
Did you try, writing to disk rather than in memory. When files are large depending upon which one of quality (performance)/quantity 
You want to have, writing to disk would get the load of executors down and will pass to stage where format your data in app2.
Other options are to use Kafka sinks and write from spark App1 to sink and spark App2 here would be able to process as the data comes in. Performance of App2 would also be better in this case.
Thanks,Upendra.MData Platform Engineer
Sent from Yahoo Mail on Android 
 
  On Thu, May 25, 2017 at 12:47 PM, Burak Yavuz<br...@gmail.com> wrote:   Hi Paul,
From what you're describing, it seems that stream1 is possibly generating tons of small files and stream2 is OOMing because it tries to maintain an in-memory list of files. Some notes/questions:
 1. Parquet files are splittable, therefore having large parquet files shouldn't be a problem. The larger a parquet file is, the longer the write process will take, but the read path shouldn't be adversely affected. 2. How many partitions are you writing out to? 3. In order to reduce the number of files, you may call:`repartition(partitionColumns).writeStream.partitionBy(partitionColumns)` so that every trigger, you output only 1 file per partition. After some time, you may want to compact files if you don't partition by date.
Best,Burak


On Thu, May 25, 2017 at 7:13 AM, Paul Corley <pa...@ignitionone.com> wrote:


I have a Spark Structured Streaming process that is implemented in 2 separate streaming apps.

 

First App reads .gz, which range in size from 1GB to 9GB compressed, files in from s3 filters out invalid records and repartitions the data and outputs to parquet on s3 partitioned the same as the stream is partitioned. This process produces thousands of files which other processes consume.  The thought on this approach was to:

1)      Break the file down to smaller more easily consumed sizes

2)      Allow a more parallelism in the processes that consume the data.

3)      Allow multiple downstream processes to consume data that has already

a.      Had bad records filtered out

b.      Not have to fully read in such large files

 

Second application reads in the files produced by the first app.  This process then reformats the data from a row that is:

 

12NDSIN|20170101:123313, 5467;20170115:987

 

into:

12NDSIN, 20170101, 123313

12NDSIN, 20170101, 5467

12NDSIN, 20170115, 987

 

App 1 runs no problems and churns through files in its source directory on s3.  Total process time for a file is < 10min.  App2 is the one having issues.

 

The source is defined as

val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize) 
  .schema(rawSchema)
  .parquet(config.getString(" aws.s3.sourcepath"))   ç===Line85

 

output is defined as

val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",c onfig.getString("spark.app. checkpoint_dir") +"/bk")
  .option("path", config.getString("spark.app. s3.output"))
  .start()
  .awaitTermination()

 

If files exist from app 1 app 2 enters a cycle of just cycling throughparquet at ProcessFromSource.scala:85           3999/3999

 

If there are a few files output from app1 eventually it will enter the stage where it actually processes the data and begins to output, but the more files produced by app1 the longer it takes if it ever completes these steps.  With an extremely large number of files the app eventually throws a java OOM error. Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in this step and how to alleviate it

 

 

 

Thanks,

 

Paul Corley| Principle Data Engineer




  

Re: Structured Streaming from Parquet

Posted by Burak Yavuz <br...@gmail.com>.
Hi Paul,

From what you're describing, it seems that stream1 is possibly generating
tons of small files and stream2 is OOMing because it tries to maintain an
in-memory list of files. Some notes/questions:

 1. Parquet files are splittable, therefore having large parquet files
shouldn't be a problem. The larger a parquet file is, the longer the write
process will take, but the read path shouldn't be adversely affected.
 2. How many partitions are you writing out to?
 3. In order to reduce the number of files, you may call:
`repartition(partitionColumns).writeStream.partitionBy(partitionColumns)`
so that every trigger, you output only 1 file per partition. After some
time, you may want to compact files if you don't partition by date.

Best,
Burak



On Thu, May 25, 2017 at 7:13 AM, Paul Corley <pa...@ignitionone.com>
wrote:

> I have a Spark Structured Streaming process that is implemented in 2
> separate streaming apps.
>
>
>
> First App reads .gz, which range in size from 1GB to 9GB compressed, files
> in from s3 filters out invalid records and repartitions the data and
> outputs to parquet on s3 partitioned the same as the stream is partitioned.
> This process produces thousands of files which other processes consume.
> The thought on this approach was to:
>
> 1)       Break the file down to smaller more easily consumed sizes
>
> 2)       Allow a more parallelism in the processes that consume the data.
>
> 3)       Allow multiple downstream processes to consume data that has
> already
>
> a.       Had bad records filtered out
>
> b.       Not have to fully read in such large files
>
>
>
> Second application reads in the files produced by the first app.  This
> process then reformats the data from a row that is:
>
>
>
> 12NDSIN|20170101:123313, 5467;20170115:987
>
>
>
> into:
>
> 12NDSIN, 20170101, 123313
>
> 12NDSIN, 20170101, 5467
>
> 12NDSIN, 20170115, 987
>
>
>
> App 1 runs no problems and churns through files in its source directory on
> s3.  Total process time for a file is < 10min.  App2 is the one having
> issues.
>
>
>
> The source is defined as
>
> *val *rawReader = sparkSession
>   .readStream
>   .option(*"latestFirst"*, *"true"*)
>   .option(*"maxFilesPerTrigger"*, batchSize)
>   .schema(rawSchema)
>   .parquet(config.getString(*"aws.s3.sourcepath"*))   ç===Line85
>
>
>
> output is defined as
>
> *val *query = output
>   .writeStream
>   .queryName(*"bk"*)
>   .format(*"parquet"*)
>   .partitionBy(*"expireDate"*)
>   .trigger(*ProcessingTime*(*"10 seconds"*))
>   .option(*"checkpointLocation"*,*config*.getString(
> *"spark.app.checkpoint_dir"*) + *"/bk"*)
>   .option(*"path"*, *config*.getString(*"spark.app.s3.output"*))
>   .start()
>   .awaitTermination()
>
>
>
> If files exist from app 1 app 2 enters a cycle of just cycling through parquet
> at ProcessFromSource.scala:85
> <http://ip-10-205-68-107.ec2.internal:18080/history/application_1491337161441_4439/stages/stage?id=78&attempt=0>
>           3999/3999
>
>
>
> If there are a few files output from app1 eventually it will enter the
> stage where it actually processes the data and begins to output, but the
> more files produced by app1 the longer it takes if it ever completes these
> steps.  With an extremely large number of files the app eventually throws a
> java OOM error. Additionally each cycle through this step takes
> successively longer.
>
> Hopefully someone can lend some insight as to what is actually taking
> place in this step and how to alleviate it
>
>
>
>
>
>
>
> Thanks,
>
>
>
> *Paul Corley* | Principle Data Engineer
>
>