You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2016/09/10 20:17:04 UTC
Not sure why Filter on DStream doesn't get invoked?
Hi All,
I am trying to simplify how to frame my question so below is my code. I see
that BAR gets printed but not FOO and I am not sure why? my batch interval
is 1 second (something I pass in when I create a spark context). any idea?
I have bunch of events and I want to store the number of events where the
status == "Pending" every second (no prior state needed).
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
System.out.println("****************FOO******************");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return
jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
System.out.println("*****************BAR******************");
store(stringJavaRDD.count());
}
});