You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yana Kadiyska <ya...@gmail.com> on 2014/07/31 16:07:02 UTC

SparkStreaming -- Suppored directory structure

Hi,

trying to figure out how to process files under the following
directory structure:

<base_dir>/batch_dir/*.snappy

I have a base directory and every so often a new batch gets dropped
with a bunch of .snappy files as well as a bunch of .xml files. I'd
like to only process the snappy files but can't figure out how to hook
it up with Spark Streaming.

I've tried this:

ssc.textFileStream(“base_dir/batch_dir/*.snappy”) which doesn’t work,
this, which does work but is not quite what I want since there are
file formats I don’t care about
ssc.textFileStream(“base_dir/batch_dir”)

and this, which is closest to what I want (watch all batch folders for
new snappy files):
ssc.textFileStream(“base_dir//.snappy”)

java.io.FileNotFoundException: File hdfs://cdh4-18164-nn/test//.snappy
does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)


The path itself definitely exists and is reachable both via the
regular Spark context: sc.textFile(...) and via hadoop -ls.

Any help on how to include wildcards for streaming or filter out
unwanted extensions would be much appreciated.