You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Kappaganthu, Sivaram (ES)" <Si...@ADP.com> on 2016/09/15 17:00:39 UTC

Spark Streaming-- for each new file in HDFS

Hello,

I am a newbie to spark and I have  below requirement.

Problem statement : A third party application is dumping files continuously in a server. Typically the count of files is 100 files  per hour and each file is of size less than 50MB. My application has to  process those files.

Here
1) is it possible  for spark-stream to trigger a job after a file is placed instead of triggering a job at fixed batch interval?
2) If it is not possible with Spark-streaming, can we control this with Kafka/Flume

Thanks,
Sivaram

----------------------------------------------------------------------
This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.

Re: Spark Streaming-- for each new file in HDFS

Posted by Alonso Isidoro Roman <al...@gmail.com>.
Why flume isn't an option here?

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-10-05 11:14 GMT+02:00 Kappaganthu, Sivaram (ES) <
Sivaram.Kappaganthu@adp.com>:

> Hi Franke,
>
>
>
> Thanks for your reply. I am trying this and  doing as follows.
>
>
>
> Let the third party application 1) dumps the original file in a directory
> and  .upload file in another directory.
>
> I am writing logic to listen to  directory that contains .upload files.
>
>
>
> Here I need to map the name of the file in both the directories. Could you
> please suggest how to get the filename in streaming.
>
>
>
> val sc = new SparkContext("local[*]", "test")
>
> val ssc = new StreamingContext(sc, Seconds(4))
>
> val dStream = ssc.textFileStream(pathOfDirToStream)
>
> dStream.foreachRDD { eventsRdd => */* How to get the file name */* }
>
>
>
>
>
> *From:* Jörn Franke [mailto:jornfranke@gmail.com]
> *Sent:* Thursday, September 15, 2016 11:02 PM
> *To:* Kappaganthu, Sivaram (ES)
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming-- for each new file in HDFS
>
>
>
> Hi,
>
> I recommend that the third party application puts an empty file with the
> same filename as the original file, but the extension ".uploaded". This is
> an indicator that the file has been fully (!) written to the fs. Otherwise
> you risk only reading parts of the file.
>
> Then, you can have a file system listener for this .upload file.
>
>
>
> Spark streaming or Kafka are not needed/suitable, if the server is a file
> server. You can use oozie (maybe with a simple custom action) to poll for
> .uploaded files and transmit them.
>
>
> On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <
> Sivaram.Kappaganthu@ADP.com> wrote:
>
> Hello,
>
>
>
> I am a newbie to spark and I have  below requirement.
>
>
>
> Problem statement : A third party application is dumping files
> continuously in a server. Typically the count of files is 100 files  per
> hour and each file is of size less than 50MB. My application has to
>  process those files.
>
>
>
> Here
>
> 1) is it possible  for spark-stream to trigger a job after a file is
> placed instead of triggering a job at fixed batch interval?
>
> 2) If it is not possible with Spark-streaming, can we control this with
> Kafka/Flume
>
>
>
> Thanks,
>
> Sivaram
>
>
> ------------------------------
>
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>
>

RE: Spark Streaming-- for each new file in HDFS

Posted by "Kappaganthu, Sivaram (ES)" <Si...@ADP.com>.
Hi Franke,

Thanks for your reply. I am trying this and  doing as follows.

Let the third party application 1) dumps the original file in a directory and  .upload file in another directory.
I am writing logic to listen to  directory that contains .upload files.

Here I need to map the name of the file in both the directories. Could you please suggest how to get the filename in streaming.

val sc = new SparkContext("local[*]", "test")
val ssc = new StreamingContext(sc, Seconds(4))
val dStream = ssc.textFileStream(pathOfDirToStream)
dStream.foreachRDD { eventsRdd => /* How to get the file name */ }


From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Thursday, September 15, 2016 11:02 PM
To: Kappaganthu, Sivaram (ES)
Cc: user@spark.apache.org
Subject: Re: Spark Streaming-- for each new file in HDFS

Hi,
I recommend that the third party application puts an empty file with the same filename as the original file, but the extension ".uploaded". This is an indicator that the file has been fully (!) written to the fs. Otherwise you risk only reading parts of the file.
Then, you can have a file system listener for this .upload file.

Spark streaming or Kafka are not needed/suitable, if the server is a file server. You can use oozie (maybe with a simple custom action) to poll for .uploaded files and transmit them.

On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <Si...@ADP.com>> wrote:
Hello,

I am a newbie to spark and I have  below requirement.

Problem statement : A third party application is dumping files continuously in a server. Typically the count of files is 100 files  per hour and each file is of size less than 50MB. My application has to  process those files.

Here
1) is it possible  for spark-stream to trigger a job after a file is placed instead of triggering a job at fixed batch interval?
2) If it is not possible with Spark-streaming, can we control this with Kafka/Flume

Thanks,
Sivaram

________________________________
This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.

Re: Spark Streaming-- for each new file in HDFS

Posted by Steve Loughran <st...@hortonworks.com>.
On 16 Sep 2016, at 01:03, Peyman Mohajerian <mo...@gmail.com>> wrote:

You can listen to files in a specific directory using:
Take a look at: http://spark.apache.org/docs/latest/streaming-programming-guide.html

streamingContext.fileStream



yes, this works

here's an example I'm using to test using object stores like s3 & azure as sources of data

https://github.com/steveloughran/spark/blob/c2b7d885f91bb447ace8fbac427b2fdf9c84b4ef/cloud/src/main/scala/org/apache/spark/cloud/examples/CloudStreaming.scala#L83

SparkStreamingContext.textFileStream(streamGlobPath.toUri.toString) takes a directory ("/incoming/") or a glob path to directories ("incoming/2016/09/*) and will scan for data


-It will scan every window, looking for files with a modified time within that window
-you can then just hook up  a map to the output, start the ssc, evalu


      val lines = ssc.textFileStream(streamGlobPath.toUri.toString)

      val matches = lines.filter(_.endsWith("3")).map(line => {
        sightings.add(1)
        line
      })

      matches.print()
      ssc.start()

Once a file has been processed, it will not been scanned again, even if its modtime is updated. (ignoring executor failure/restart, and the bits in the code about remember durations). That means updates to a file within a window can be missed.

If you are writing to files from separate code, it is safest to write elsewhere and then copy/rename the file once complete.


(things are slightly complicated by the fact that HDFS doesn' t update modtimes until (a) the file is closed or (b) enough data has been written that the write spans a block boundary. That means that incremental writes to HDFS may appear to work, but once you write > 64 MB, or work with a different FS, changes may get lost.

But: it does work, lets you glue up streaming code to any workflow which generates output in files



On Thu, Sep 15, 2016 at 10:31 AM, Jörn Franke <jo...@gmail.com>> wrote:
Hi,
I recommend that the third party application puts an empty file with the same filename as the original file, but the extension ".uploaded". This is an indicator that the file has been fully (!) written to the fs. Otherwise you risk only reading parts of the file.
Then, you can have a file system listener for this .upload file.

Spark streaming or Kafka are not needed/suitable, if the server is a file server. You can use oozie (maybe with a simple custom action) to poll for .uploaded files and transmit them.




Re: Spark Streaming-- for each new file in HDFS

Posted by Peyman Mohajerian <mo...@gmail.com>.
You can listen to files in a specific directory using:
Take a look at:
http://spark.apache.org/docs/latest/streaming-programming-guide.html

streamingContext.fileStream


On Thu, Sep 15, 2016 at 10:31 AM, Jörn Franke <jo...@gmail.com> wrote:

> Hi,
> I recommend that the third party application puts an empty file with the
> same filename as the original file, but the extension ".uploaded". This is
> an indicator that the file has been fully (!) written to the fs. Otherwise
> you risk only reading parts of the file.
> Then, you can have a file system listener for this .upload file.
>
> Spark streaming or Kafka are not needed/suitable, if the server is a file
> server. You can use oozie (maybe with a simple custom action) to poll for
> .uploaded files and transmit them.
>
> On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <
> Sivaram.Kappaganthu@ADP.com> wrote:
>
> Hello,
>
>
>
> I am a newbie to spark and I have  below requirement.
>
>
>
> Problem statement : A third party application is dumping files
> continuously in a server. Typically the count of files is 100 files  per
> hour and each file is of size less than 50MB. My application has to
>  process those files.
>
>
>
> Here
>
> 1) is it possible  for spark-stream to trigger a job after a file is
> placed instead of triggering a job at fixed batch interval?
>
> 2) If it is not possible with Spark-streaming, can we control this with
> Kafka/Flume
>
>
>
> Thanks,
>
> Sivaram
>
>
> ------------------------------
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>
>

Re: Spark Streaming-- for each new file in HDFS

Posted by Jörn Franke <jo...@gmail.com>.
Hi,
I recommend that the third party application puts an empty file with the same filename as the original file, but the extension ".uploaded". This is an indicator that the file has been fully (!) written to the fs. Otherwise you risk only reading parts of the file.
Then, you can have a file system listener for this .upload file.

Spark streaming or Kafka are not needed/suitable, if the server is a file server. You can use oozie (maybe with a simple custom action) to poll for .uploaded files and transmit them.

> On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <Si...@ADP.com> wrote:
> 
> Hello,
>  
> I am a newbie to spark and I have  below requirement.
>  
> Problem statement : A third party application is dumping files continuously in a server. Typically the count of files is 100 files  per hour and each file is of size less than 50MB. My application has to  process those files.
>  
> Here
> 1) is it possible  for spark-stream to trigger a job after a file is placed instead of triggering a job at fixed batch interval?
> 2) If it is not possible with Spark-streaming, can we control this with Kafka/Flume
>  
> Thanks,
> Sivaram
>  
> This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.