You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Tathagata Das <ta...@gmail.com> on 2014/02/22 08:09:32 UTC

Re: Spark Streaming : Not working with TextFileStream on HDFS

For the log trace, it does not seem like it is finding any new file, which
is why it is not creating any output data. Are you sure your inserting text
files correctly into the directory that you have created textStream on?
Does this program work locally with local file system using the same
mechanism of adding files to a local directory?

TD


On Mon, Feb 17, 2014 at 1:09 AM, Suraj Satishkumar Sheth <surajsat@adobe.com
> wrote:

> Hi,
> I am trying a Spark Streaming job with a Text File Stream on HDFS with
> Spark 0.9.0 from cloudera.
> I am saving the RDD (100 seconds is streaming frequency) to HDFS in a
> different directory. Every 100 seconds, it is creating a new directory in
> HDFS with _Success(stream-Random/_Success). But, it is not adding any
> data/output to it. I verified that I am adding new files to the correct
> HDFS directory. Another change I have noticed is that, Spark picks up the
> initial files present in the directory used for streaming. The behaviour
> seems like Batch processing. Although, at specified interval, it does
> create a new folder in HDFS with _Success.
> So, the major issue is that it is not able to recognize new files created
> in HDFS.
>
> Code used :
> val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid",
> Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
>
>  val data = ssc.textFileStream(ClusterConfig.hdfsNN +
> "correct/path/to/data")
>  data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN +
> "/user<path/to/file>" + Random.nextInt))
>  ssc.start
>
>
> It is creating these directories with only _Success :
> stream562343230
> stream1228731977
> stream318151149
> stream603511115
>
>
> This is the error stack I get :
> 14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
> 14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000
> ms:
>
> 14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
> 14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job
> 1392626300000 ms.0 from job set of time 1392626300000 ms
> 14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated.
> Instead, use mapreduce.job.id
> 14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated.
> Instead, use mapreduce.task.id
> 14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated.
> Instead, use mapreduce.task.attempt.id
> 14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated.
> Instead, use mapreduce.task.ismap
> 14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated.
> Instead, use mapreduce.task.partition
> 14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 0.001934866 s
> 14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job
> 1392626300000 ms.0 from job set of time 1392626300000 ms
> 14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time
> 1392626300000 ms (execution: 0.167 s)
> 14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were
> older than 1392626200000 ms:
> 14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
> 14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000
> ms:
>
> 14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
> 14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job
> 1392626400000 ms.0 from job set of time 1392626400000 ms
> 14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 1.9016E-5 s
> 14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job
> 1392626400000 ms.0 from job set of time 1392626400000 ms
> 14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time
> 1392626400000 ms (execution: 0.077 s)
> 14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were
> older than 1392626300000 ms:
> 14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
> 14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000
> ms:
>
> 14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
> 14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job
> 1392626500000 ms.0 from job set of time 1392626500000 ms
> 14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file
> of type (NullWritable,BytesWritable)
> 14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at
> TestStreaming.scala:29
> 14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at
> TestStreaming.scala:29, took 1.8111E-5 s
> 14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job
> 1392626500000 ms.0 from job set of time 1392626500000 ms
> 14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were
> older than 1392626400000 ms: 1392626300000 ms
> 14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time
> 1392626500000 ms (execution: 0.102 s)
>
>
> Thanks and Regards,
> Suraj Sheth
>