You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vatsal <va...@live.com> on 2016/09/30 14:05:42 UTC
Stopping spark steaming context on encountering certain type of
message on Kafka
In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code: jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream./LongAccumulator poisonNotifier =
sc.longAccumulator("poisonNotifier");directStream.foreachRDD(rdd -> {
RDD rows = rdd.values().map(value -> { if
(value.equals("poison") { poisonNotifier.add(1);
} else { ... } return row;
}).rdd(); });jsc.start();ExecutorService poisonMonitor =
Executors.newSingleThreadExecutor();poisonMonitor.execute(() -> { while
(true) { if (poisonNotifier.value() > 0) { jsc.stop(false,
true); break; } }});try { jsc.awaitTermination();}
catch (InterruptedException e) {
e.printStackTrace();}poisonMonitor.shutdown();/Although this approach is
working, this doesn't sounds like right approach to me. Is there any other
better(cleaner) way to achieve the same?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Replying same post with proper formatting. - sorry for extra mail
Posted by vatsal <va...@live.com>.
In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code: jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream.
/
LongAccumulator poisonNotifier = sc.longAccumulator("poisonNotifier");
directStream.foreachRDD(rdd -> {
RDD<Row> rows = rdd.values().map(value -> {
if (value.equals("poison") {
poisonNotifier.add(1);
} else {
...
}
return row;
}).rdd();
});
jsc.start();
ExecutorService poisonMonitor = Executors.newSingleThreadExecutor();
poisonMonitor.execute(() -> {
while (true) {
if (poisonNotifier.value() > 0) {
jsc.stop(false, true);
break;
}
}
});
try {
jsc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
poisonMonitor.shutdown();/
Although this approach is working, this doesn't sounds like right approach
to me. Is there any other better(cleaner) way to achieve the same?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822p27823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org