You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Santiago Mola <sm...@stratio.com> on 2014/09/15 09:32:50 UTC
Re: Developing a spark streaming application
Just for the record, this is being discussed at StackOverflow:
http://stackoverflow.com/questions/25663026/developing-a-spark-streaming-application/25766618
2014-08-27 10:28 GMT+02:00 Filip Andrei <an...@gmail.com>:
> Hey guys, so the problem i'm trying to tackle is the following:
>
> - I need a data source that emits messages at a certain frequency
> - There are N neural nets that need to process each message individually
> - The outputs from all neural nets are aggregated and only when all N
> outputs for each message are collected, should a message be declared fully
> processed
> - At the end i should measure the time it took for a message to be fully
> processed (time between when it was emitted and when all N neural net
> outputs from that message have been collected)
>
>
> What i'm mostly interested in is if i approached the problem correctly in
> the first place and if so some best practice pointers on my approach.
>
>
>
>
>
>
> And my current implementation if the following:
>
>
> For a data source i created the class
> public class JavaRandomReceiver extends Receiver<Map<String,
> Object>>
>
> As i decided a key-value store would be best suited to holding emitted
> data.
>
>
> The onStart() method initializes a custom random sequence generator and
> starts a thread that
> continuously generates new neural net inputs and stores them as following:
>
> SensorData sdata = generator.createSensorData();
>
> Map<String, Object> result = new HashMap<String, Object>();
>
> result.put("msgNo", sdata.getMsgNo());
> result.put("sensorTime", sdata.getSampleTime());
> result.put("list", sdata.getPayload());
> result.put("timeOfProc", sdata.getCreationTime());
>
> store(result);
>
> // sleeps for a given amount of time set at generator creation
> generator.waitForNextTuple();
>
> The msgNo here is incremented for each newly created message and is used to
> keep
>
>
> The neural net functionality is added by creating a custom mapper
> public class NeuralNetMapper implements Function<Map<String,
> Object>,
> Map<String, Object>>
>
> whose call function basically just takes the input map, plugs its "list"
> object as the input to the neural net object, replaces the map's initial
> list with the neural net output and returns the modified map.
>
>
>
>
> The aggregator is implemented as a single class that has the following form
>
> public class JavaSyncBarrier implements
> Function<JavaRDD<Map<String,Object>>, Void>
>
>
>
> This class maintains a google guava cache of neural net outputs that it has
> received in the form of
> <Long, List<Map<String, Object>>>, where the Long value is the msgNo
> and the list contains all maps containing said message number.
>
> When a new map is received, it is added to the cache, its list's length is
> compared to to the total number of neural nets and, if these numbers match,
> that message number is said to be fully processed and a difference between
> timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
> current system time is displayed as the total time necessary for
> processing.
>
>
>
>
>
> Now the way all these components are linked together is the following:
>
> public static void main(String[] args) {
>
>
> SparkConf conf = new SparkConf();
> conf.setAppName("SimpleSparkStreamingTest");
>
>
> JavaStreamingContext jssc = new JavaStreamingContext(conf, new
> Duration(1000));
>
> jssc.checkpoint("/tmp/spark-tempdir");
>
> // Generator config goes here
> // Set to emit new message every 1 second
> // ---
>
> // Neural net config goes here
> // ---
>
> JavaReceiverInputDStream<Map<String, Object>> rndLists = jssc
> .receiverStream(new JavaRandomReceiver(generatorConfig);
>
> List<JavaDStream<Map<String, Object>>>
> neuralNetOutputStreams = new
> ArrayList<JavaDStream<Map<String, Object>>>();
>
> for(int i = 0; i < numberOfNets; i++){
>
> neuralNetOutputStreams .add(
> rndLists.map(new NeuralNetMapper(neuralNetConfig))
> );
> }
>
> JavaDStream<Map<String, Object>> joined =
> joinStreams(neuralNetOutputs);
>
> joined.foreach(new JavaSyncBarrier(numberOfNets));
>
> jssc.start();
> jssc.awaitTermination();
> }
>
> where joinStreams unifies a list of streams:
> public static <T> JavaDStream<T>
> joinStreams(List<JavaDStream<T>>
> streams) {
>
> JavaDStream<T> result = streams.get(0);
> for (int i = 1; i < streams.size(); i++) {
> result = result.union(streams.get(i));
> }
>
> return result;
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
--
Santiago M. Mola
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: 91 352 59 42 // @stratiobd