You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by robin_up <ro...@gmail.com> on 2014/02/18 08:25:24 UTC

DStream.saveAsTextFiles() saves nothing

Hi,

I have a Streaming app which reads from inputs, does some text
transformation and try to output to a HDFS text file by using
saveAsTextFiles in DSteam object.

But the method saves nothing (not even an empty file), the jobs successfully
run, i.e. no error/warning. When I replace the save-to-file part with
"print()", it prints out contents on the screen. Also tried
"saveAsTextFiles" in SC RDD, works.

Not sure why, did anyone get "saveAsTextFiles" working with DStream?

Here is the line of code I use for output:
actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test")

I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0.

thanks
Robin



-----
-- Robin Li
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: DStream.saveAsTextFiles() saves nothing

Posted by robin_up <ro...@gmail.com>.
Suraj

I think your issue is the code is not detecting new files in the directory
you set, a bit different from what I'm facing. If it detects a new file, you
should see something similar in the logs:

14/02/18 19:17:30 INFO JobScheduler: Added jobs for time 1392751050000 ms
14/02/18 19:17:30 INFO JobScheduler: Starting job streaming job
1392751050000 ms.0 from job set of time 1392751050000 ms
14/02/18 19:17:30 WARN Configuration: mapred.output.compress is deprecated.
Instead, use mapreduce.output.fileoutputformat.compress
14/02/18 19:17:30 WARN Configuration: mapred.output.compression.codec is
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
14/02/18 19:17:30 WARN Configuration: mapred.output.compression.type is
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
14/02/18 19:17:30 INFO SparkContext: Starting job: saveAsTextFile at
web_reqeusts.scala:81
14/02/18 19:17:30 INFO SparkContext: Job finished: saveAsTextFile at
web_reqeusts.scala:81, took 2.4977E-5 s
14/02/18 19:17:30 INFO JobScheduler: Finished job streaming job
1392751050000 ms.0 from job set of time 1392751050000 ms
14/02/18 19:17:30 INFO JobScheduler: Total delay: 0.036 s for time
1392751050000 ms (execution: 0.023 s)
14/02/18 19:17:30 INFO FileInputDStream: Cleared 1 old files that were older
than 1392751020000 ms: 1392750990000 ms
14/02/18 19:18:00 INFO FileInputDStream: Finding new files took 8 ms
14/02/18 19:18:00 INFO FileInputDStream: New files at time 1392751080000 ms:
*hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-8f0e55d43a6c43bba48a97d4c448762a.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-ad3925c1ce04450abde4208b731aae1d.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-b9c49509293b459cae0715d5905e6805.sdb.gz*
14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with
curMem=6308257, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_37 stored as values to
memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with
curMem=6478750, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_38 stored as values to
memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with
curMem=6649243, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_39 stored as values to
memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1



-----
-- Robin Li
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666p1690.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: DStream.saveAsTextFiles() saves nothing

Posted by robin_up <ro...@gmail.com>.
Amit

I'm certain files were taken in without any issue -- I see the intake files
names print out in logs as the last message I replied to Suraj.

I did a little further experiments by using spark.RDD.saveAsTextFiles()
instead of DStream.saveAsTextFiles(). It works on the same dataset, so I'm
pretty sure the problem is with Dstream.

Can anyone confirm this is a bug or I'm using/understanding it in a wrong
way? 

*// This line does not work*
actions.saveAsTextFiles("hdfs://nn1:8020/user/etl/rtp_sink/actions/test",
"testtest")
*// This line works*
actions.foreachRDD(rdd =>
rdd.saveAsTextFile("hdfs://nn1:8020/user/etl/rtp_sink/actions/test",
classOf[org.apache.hadoop.io.compress.GzipCodec]))

Logs from using Dsteam.saveAsTextFiles() -- new files detected OK, jobs
finished without any error/warning:

14/02/18 19:35:00 INFO FileInputDStream: New files at time 1392752100000 ms:
hdfs://nn1:8020/user/etl/rtp_sink/staging/0134-ae2fc0ed9f824ab1a91f5fd81ad45af3.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0134-f03bd4319bd24f13aa2f7c6b6a0d7631.sdb.gz
14/02/18 19:35:00 INFO MemoryStore: ensureFreeSpace(170493) called with
curMem=6990229, maxMem=9003781324
14/02/18 19:35:00 INFO MemoryStore: Block broadcast_41 stored as values to
memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:35:00 INFO MemoryStore: ensureFreeSpace(170493) called with
curMem=7160722, maxMem=9003781324
14/02/18 19:35:00 INFO MemoryStore: Block broadcast_42 stored as values to
memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:35:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:35:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:35:00 INFO JobScheduler: Added jobs for time 1392752100000 ms
14/02/18 19:35:00 INFO JobScheduler: Starting job streaming job
1392752100000 ms.0 from job set of time 1392752100000 ms
14/02/18 19:35:00 INFO SparkContext: Starting job: saveAsTextFile at
DStream.scala:762
14/02/18 19:35:00 INFO DAGScheduler: Got job 7 (saveAsTextFile at
DStream.scala:762) with 2 output partitions (allowLocal=false)
14/02/18 19:35:00 INFO DAGScheduler: Final stage: Stage 2 (saveAsTextFile at
DStream.scala:762)
14/02/18 19:35:00 INFO DAGScheduler: Parents of final stage: List()
14/02/18 19:35:00 INFO DAGScheduler: Missing parents: List()
14/02/18 19:35:00 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[98] at
saveAsTextFile at DStream.scala:762), which has no missing parents
14/02/18 19:35:00 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2
(MappedRDD[98] at saveAsTextFile at DStream.scala:762)
14/02/18 19:35:00 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/02/18 19:35:00 INFO TaskSetManager: Starting task 2.0:1 as TID 49 on
executor 2: hadoop-dal01-dev-dn8.tapjoy.com (NODE_LOCAL)
14/02/18 19:35:00 INFO TaskSetManager: Serialized task 2.0:1 as 12419 bytes
in 0 ms
14/02/18 19:35:00 INFO TaskSetManager: Starting task 2.0:0 as TID 50 on
executor 5: hadoop-dal01-dev-dn7.tapjoy.com (NODE_LOCAL)
14/02/18 19:35:00 INFO TaskSetManager: Serialized task 2.0:0 as 12419 bytes
in 0 ms
14/02/18 19:35:26 INFO TaskSetManager: Finished TID 50 in 26508 ms on
hadoop-dal01-dev-dn7.tapjoy.com (progress: 0/2)
14/02/18 19:35:26 INFO DAGScheduler: Completed ResultTask(2, 0)
14/02/18 19:35:30 INFO FileInputDStream: Finding new files took 6 ms



-----
-- Robin Li
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666p1691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: DStream.saveAsTextFiles() saves nothing

Posted by Amit Behera <am...@gmail.com>.
>
> Hi Robin,
>        Please make sure , whether any new file is taken as a input or not.
> If it is fine then please check what is the the exact Error is coming  on
> Cluster WEB UI.
>

Thanks  & Regards
Amit Ku. Behera

RE: DStream.saveAsTextFiles() saves nothing

Posted by Suraj Satishkumar Sheth <su...@adobe.com>.
Hi,
I am facing a similar issue.

I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera. 
I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory. Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success). But, it is not adding any data/output to it. I verified that I am adding new files to the correct HDFS directory. Although, at specified interval, it does create a new folder in HDFS with _Success.
So, the major issue is that it is not able to recognize new files created in HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")  
data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user/<path/to/file/stream>" + Random.nextInt))  
ssc.start


It is creating these directories with only _Success : 
stream562343230
stream1228731977
stream318151149
stream603511115


This is the log that I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution: 0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000 ms: 
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution: 0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000 ms: 
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000 ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution: 0.102 s)


Thanks and Regards,
Suraj Sheth

-----Original Message-----
From: robin_up [mailto:robin.up@gmail.com] 
Sent: 18 February 2014 12:55
To: user@spark.incubator.apache.org
Subject: DStream.saveAsTextFiles() saves nothing

Hi,

I have a Streaming app which reads from inputs, does some text transformation and try to output to a HDFS text file by using saveAsTextFiles in DSteam object.

But the method saves nothing (not even an empty file), the jobs successfully run, i.e. no error/warning. When I replace the save-to-file part with "print()", it prints out contents on the screen. Also tried "saveAsTextFiles" in SC RDD, works.

Not sure why, did anyone get "saveAsTextFiles" working with DStream?

Here is the line of code I use for output:
actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test")

I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0.

thanks
Robin



-----
-- Robin Li
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.