You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Suraj Satishkumar Sheth <su...@adobe.com> on 2014/02/17 10:09:55 UTC

Spark Streaming : Not working with TextFileStream on HDFS

Hi,
I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera. 
I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory. Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success). But, it is not adding any data/output to it. I verified that I am adding new files to the correct HDFS directory. Another change I have noticed is that, Spark picks up the initial files present in the directory used for streaming. The behaviour seems like Batch processing. Although, at specified interval, it does create a new folder in HDFS with _Success.
So, the major issue is that it is not able to recognize new files created in HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")
 data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user<path/to/file>" + Random.nextInt))
 ssc.start


It is creating these directories with only _Success : 
stream562343230
stream1228731977
stream318151149
stream603511115


This is the error stack I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution: 0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000 ms: 
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution: 0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000 ms: 
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000 ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution: 0.102 s)


Thanks and Regards,
Suraj Sheth

Re: Spark Streaming : Not working with TextFileStream on HDFS

Posted by Tathagata Das <ta...@gmail.com>.
For the log trace, it does not seem like it is finding any new file, which
is why it is not creating any output data. Are you sure your inserting text
files correctly into the directory that you have created textStream on?
Does this program work locally with local file system using the same
mechanism of adding files to a local directory?

TD


On Mon, Feb 17, 2014 at 1:09 AM, Suraj Satishkumar Sheth <surajsat@adobe.com
> wrote:

> Hi,
> I am trying a Spark Streaming job with a Text File Stream on HDFS with
> Spark 0.9.0 from cloudera.
> I am saving the RDD (100 seconds is streaming frequency) to HDFS in a
> different directory. Every 100 seconds, it is creating a new directory in
> HDFS with _Success(stream-Random/_Success). But, it is not adding any
> data/output to it. I verified that I am adding new files to the correct
> HDFS directory. Another change I have noticed is that, Spark picks up the
> initial files present in the directory used for streaming. The behaviour
> seems like Batch processing. Although, at specified interval, it does
> create a new folder in HDFS with _Success.
> So, the major issue is that it is not able to recognize new files created
> in HDFS.
>
> Code used :
> val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid",
> Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
>
>  val data = ssc.textFileStream(ClusterConfig.hdfsNN +
> "correct/path/to/data")
>  data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN +
> "/user<path/to/file>" + Random.nextInt))
>  ssc.start
>
>
> It is creating these directories with only _Success :
> stream562343230
> stream1228731977
> stream318151149
> stream603511115
>
>
> This is the error stack I get :
> 14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
> 14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000
> ms:
>
> 14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
> 14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job
> 1392626300000 ms.0 from job set of time 1392626300000 ms
> 14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated.
> Instead, use mapreduce.job.id
> 14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated.
> Instead, use mapreduce.task.id
> 14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated.
> Instead, use mapreduce.task.attempt.id
> 14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated.
> Instead, use mapreduce.task.ismap
> 14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated.
> Instead, use mapreduce.task.partition
> 14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 0.001934866 s
> 14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job
> 1392626300000 ms.0 from job set of time 1392626300000 ms
> 14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time
> 1392626300000 ms (execution: 0.167 s)
> 14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were
> older than 1392626200000 ms:
> 14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
> 14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000
> ms:
>
> 14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
> 14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job
> 1392626400000 ms.0 from job set of time 1392626400000 ms
> 14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 1.9016E-5 s
> 14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job
> 1392626400000 ms.0 from job set of time 1392626400000 ms
> 14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time
> 1392626400000 ms (execution: 0.077 s)
> 14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were
> older than 1392626300000 ms:
> 14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
> 14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000
> ms:
>
> 14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
> 14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job
> 1392626500000 ms.0 from job set of time 1392626500000 ms
> 14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 1.8111E-5 s
> 14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job
> 1392626500000 ms.0 from job set of time 1392626500000 ms
> 14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were
> older than 1392626400000 ms: 1392626300000 ms
> 14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time
> 1392626500000 ms (execution: 0.102 s)
>
>
> Thanks and Regards,
> Suraj Sheth
>