You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vatsal <va...@live.com> on 2016/09/30 14:05:42 UTC

Stopping spark steaming context on encountering certain type of message on Kafka

In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code:  jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream./LongAccumulator poisonNotifier =
sc.longAccumulator("poisonNotifier");directStream.foreachRDD(rdd -> {           
RDD rows = rdd.values().map(value -> {                              if
(value.equals("poison") {                    poisonNotifier.add(1);               
} else {                    ...                 }                return row;           
}).rdd();        });jsc.start();ExecutorService poisonMonitor =
Executors.newSingleThreadExecutor();poisonMonitor.execute(() -> {    while
(true) {        if (poisonNotifier.value() > 0) {            jsc.stop(false,
true);            break;        }    }});try {    jsc.awaitTermination();}
catch (InterruptedException e) {   
e.printStackTrace();}poisonMonitor.shutdown();/Although this approach is
working, this doesn't sounds like right approach to me. Is there any other
better(cleaner) way to achieve the same?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Replying same post with proper formatting. - sorry for extra mail

Posted by vatsal <va...@live.com>.
In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code:  jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream.

/
LongAccumulator poisonNotifier = sc.longAccumulator("poisonNotifier");

directStream.foreachRDD(rdd -> {
            RDD<Row> rows = rdd.values().map(value -> {              
                if (value.equals("poison") {
                    poisonNotifier.add(1);
                } else {
                    ... 
                }
                return row;
            }).rdd();
        });

jsc.start();
ExecutorService poisonMonitor = Executors.newSingleThreadExecutor();
poisonMonitor.execute(() -> {
    while (true) {
        if (poisonNotifier.value() > 0) {
            jsc.stop(false, true);
            break;
        }
    }
});
try {
    jsc.awaitTermination();
} catch (InterruptedException e) {
    e.printStackTrace();
}
poisonMonitor.shutdown();/

Although this approach is working, this doesn't sounds like right approach
to me. Is there any other better(cleaner) way to achieve the same?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822p27823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org