You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Santiago Mola <sm...@stratio.com> on 2014/09/15 09:32:50 UTC

Re: Developing a spark streaming application

Just for the record, this is being discussed at StackOverflow:

http://stackoverflow.com/questions/25663026/developing-a-spark-streaming-application/25766618

2014-08-27 10:28 GMT+02:00 Filip Andrei <an...@gmail.com>:

> Hey guys, so the problem i'm trying to tackle is the following:
>
> - I need a data source that emits messages at a certain frequency
> - There are N neural nets that need to process each message individually
> - The outputs from all neural nets are aggregated and only when all N
> outputs for each message are collected, should a message be declared fully
> processed
> - At the end i should measure the time it took for a message to be fully
> processed (time between when it was emitted and when all N neural net
> outputs from that message have been collected)
>
>
> What i'm mostly interested in is if i approached the problem correctly in
> the first place and if so some best practice pointers on my approach.
>
>
>
>
>
>
> And my current implementation if the following:
>
>
> For a data source i created the class
>     public class JavaRandomReceiver extends Receiver<Map&lt;String,
> Object>>
>
> As i decided a key-value store would be best suited to holding emitted
> data.
>
>
> The onStart() method initializes a custom random sequence generator and
> starts a thread that
> continuously generates new neural net inputs and stores them as following:
>
>     SensorData sdata = generator.createSensorData();
>
>     Map<String, Object> result = new HashMap<String, Object>();
>
>     result.put("msgNo", sdata.getMsgNo());
>     result.put("sensorTime", sdata.getSampleTime());
>     result.put("list", sdata.getPayload());
>     result.put("timeOfProc", sdata.getCreationTime());
>
>     store(result);
>
>     // sleeps for a given amount of time set at generator creation
>     generator.waitForNextTuple();
>
> The msgNo here is incremented for each newly created message and is used to
> keep
>
>
> The neural net functionality is added by creating a custom mapper
>     public class NeuralNetMapper implements Function<Map&lt;String,
> Object>,
> Map<String, Object>>
>
> whose call function basically just takes the input map, plugs its "list"
> object as the input to the neural net object, replaces the map's initial
> list with the neural net output and returns the modified map.
>
>
>
>
> The aggregator is implemented as a single class that has the following form
>
>     public class JavaSyncBarrier implements
> Function<JavaRDD&lt;Map&lt;String,Object>>, Void>
>
>
>
> This class maintains a google guava cache of neural net outputs that it has
> received in the form of
> <Long, List&lt;Map&lt;String, Object>>>, where the Long value is the msgNo
> and the list contains all maps containing said message number.
>
> When a new map is received, it is added to the cache, its list's length is
> compared to to the total number of neural nets and, if these numbers match,
> that message number is said to be fully processed and a difference between
> timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
> current system time is displayed as the total time necessary for
> processing.
>
>
>
>
>
> Now the way all these components are linked together is the following:
>
> public static void main(String[] args) {
>
>
>         SparkConf conf = new SparkConf();
>         conf.setAppName("SimpleSparkStreamingTest");
>
>
>         JavaStreamingContext jssc = new JavaStreamingContext(conf, new
> Duration(1000));
>
>         jssc.checkpoint("/tmp/spark-tempdir");
>
>         // Generator config goes here
>         // Set to emit new message every 1 second
>         // ---
>
>         // Neural net config goes here
>         // ---
>
>         JavaReceiverInputDStream<Map&lt;String, Object>> rndLists = jssc
>                 .receiverStream(new JavaRandomReceiver(generatorConfig);
>
>         List<JavaDStream&lt;Map&lt;String, Object>>>
> neuralNetOutputStreams = new
> ArrayList<JavaDStream&lt;Map&lt;String, Object>>>();
>
>         for(int i = 0; i < numberOfNets; i++){
>
>                 neuralNetOutputStreams .add(
>                         rndLists.map(new NeuralNetMapper(neuralNetConfig))
>                 );
>         }
>
>         JavaDStream<Map&lt;String, Object>> joined =
> joinStreams(neuralNetOutputs);
>
>         joined.foreach(new JavaSyncBarrier(numberOfNets));
>
>         jssc.start();
>         jssc.awaitTermination();
> }
>
> where joinStreams unifies a list of streams:
>         public static <T> JavaDStream<T>
> joinStreams(List<JavaDStream&lt;T>>
> streams) {
>
>                 JavaDStream<T> result = streams.get(0);
>                 for (int i = 1; i < streams.size(); i++) {
>                         result = result.union(streams.get(i));
>                 }
>
>                 return result;
>         }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>


-- 

Santiago M. Mola



Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: 91 352 59 42 // @stratiobd