You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Brandon White <bw...@gmail.com> on 2015/07/24 18:23:19 UTC

Programmatically launch several hundred Spark Streams in parallel

Hello,

So I have about 500 Spark Streams and I want to know the fastest and most
reliable way to process each of them. Right now, I am creating and process
them in a list:

val ssc = new StreamingContext(sc, Minutes(10))


val streams = paths.par.map { nameAndPath =>
  (path._1, ssc.textFileStream(path._1))
}

streams.par.foreach { nameAndStream =>
  streamTuple.foreachRDD { rdd =>
    df = sqlContext.jsonRDD(rdd)

    df.insertInto(stream._1)
  }
}

ssc.start()



Is this the best way to do this? Are there any better faster methods?

Re: Programmatically launch several hundred Spark Streams in parallel

Posted by Brandon White <bw...@gmail.com>.
THanks. Sorry the last section was supposed be

streams.par.foreach { nameAndStream =>
  nameAndStream._2.foreachRDD { rdd =>
    df = sqlContext.jsonRDD(rdd)

    df.insertInto(stream._1)
  }
}

ssc.start()


On Fri, Jul 24, 2015 at 10:39 AM, Dean Wampler <de...@gmail.com>
wrote:

> You don't need the "par" (parallel) versions of the Scala collections,
> actually, Recall that you are building a pipeline in the driver, but it
> doesn't start running cluster tasks until ssc.start() is called, at which
> point Spark will figure out the task parallelism. In fact, you might as
> well do the foreachRDD call within the initial map. No need for the streams
> collection, unless you need it for something else. Test it out to make sure
> I'm not wrong ;)
>
> However, I'm a little confused by the per-stream logic. It looks like
> you're using foreachRDD to dump each input stream into the same output
> location "stream._1". True? If it's a directory, you'll get an error that
> it already exists for the *second* stream in "streams". If you're just
> funneling all 500 inputs into the same output location, how about using
> DStream.union to combine all the input streams into one, then have one
> foreachRDD to write output?
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Fri, Jul 24, 2015 at 11:23 AM, Brandon White <bw...@gmail.com>
> wrote:
>
>> Hello,
>>
>> So I have about 500 Spark Streams and I want to know the fastest and most
>> reliable way to process each of them. Right now, I am creating and process
>> them in a list:
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>>
>> val streams = paths.par.map { nameAndPath =>
>>   (path._1, ssc.textFileStream(path._1))
>> }
>>
>> streams.par.foreach { nameAndStream =>
>>   streamTuple.foreachRDD { rdd =>
>>     df = sqlContext.jsonRDD(rdd)
>>
>>     df.insertInto(stream._1)
>>   }
>> }
>>
>> ssc.start()
>>
>>
>>
>> Is this the best way to do this? Are there any better faster methods?
>>
>>
>

Re: Programmatically launch several hundred Spark Streams in parallel

Posted by Dean Wampler <de...@gmail.com>.
You don't need the "par" (parallel) versions of the Scala collections,
actually, Recall that you are building a pipeline in the driver, but it
doesn't start running cluster tasks until ssc.start() is called, at which
point Spark will figure out the task parallelism. In fact, you might as
well do the foreachRDD call within the initial map. No need for the streams
collection, unless you need it for something else. Test it out to make sure
I'm not wrong ;)

However, I'm a little confused by the per-stream logic. It looks like
you're using foreachRDD to dump each input stream into the same output
location "stream._1". True? If it's a directory, you'll get an error that
it already exists for the *second* stream in "streams". If you're just
funneling all 500 inputs into the same output location, how about using
DStream.union to combine all the input streams into one, then have one
foreachRDD to write output?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Fri, Jul 24, 2015 at 11:23 AM, Brandon White <bw...@gmail.com>
wrote:

> Hello,
>
> So I have about 500 Spark Streams and I want to know the fastest and most
> reliable way to process each of them. Right now, I am creating and process
> them in a list:
>
> val ssc = new StreamingContext(sc, Minutes(10))
>
>
> val streams = paths.par.map { nameAndPath =>
>   (path._1, ssc.textFileStream(path._1))
> }
>
> streams.par.foreach { nameAndStream =>
>   streamTuple.foreachRDD { rdd =>
>     df = sqlContext.jsonRDD(rdd)
>
>     df.insertInto(stream._1)
>   }
> }
>
> ssc.start()
>
>
>
> Is this the best way to do this? Are there any better faster methods?
>
>