You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mahbub Murshed (JIRA)" <ji...@apache.org> on 2018/06/28 02:15:00 UTC

[jira] [Created] (SPARK-24670) How to stream only newer files from a folder in Apache Spark?

Mahbub Murshed created SPARK-24670:
--------------------------------------

             Summary: How to stream only newer files from a folder in Apache Spark?
                 Key: SPARK-24670
                 URL: https://issues.apache.org/jira/browse/SPARK-24670
             Project: Spark
          Issue Type: Question
          Components: Input/Output, Structured Streaming
    Affects Versions: 2.3.0
            Reporter: Mahbub Murshed


Background:
I have a directory in Google Cloud Storage containing files for 1.5 years of data. The files are named as hits_<DATE>_<COUNT>.csv. For example, for June 24, say there are three files, hits_20180624_000.csv, hits_20180624_001.csv, hits_20180624_002.csv. etc. The folder has files since January 2017. New files are dropped in the folder every day.

I am reading the files using Spark streaming and writing to AWS S3. 

Problem:
For the first batch Spark processes ALL files in the folder. It will take about a month to complete the entire set.

Moreover, when writing out the data, Spark isn't completely writing out each days of data until the entire folder is complete.

Example:
Say each input file contains 100,000 records.
Input:
hits_20180624_000.csv
hits_20180624_001.csv
hits_20180624_002.csv
hits_20180623_000.csv
hits_20180623_001.csv
...
hits_20170101_000.csv
hits_20170101_001.csv

Processing:
Drops half records (say). Each output files should contain 50,000 records per day.

Output Expected (number of file may be different):
year=2018/month=6/day=24/hash0.parquet
year=2018/month=6/day=24/hash1.parquet
year=2018/month=6/day=24/hash2.parquet
year=2018/month=6/day=23/hash0.parquet
year=2018/month=6/day=23/hash1.parquet
...

Problem:
Each day contains less than 50,000 records, unless entire batch is complete. In a test with a small subset this behavior was reproduced.

Question:
Is there a way to configure Spark to not load older files, even for the first load? Why is Spark not writing out the remaining records?

Things I tried:
1. A trigger of 1 hr
2. Watermarking based on eventtime

[1]: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org