You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2016/09/10 20:17:04 UTC

Not sure why Filter on DStream doesn't get invoked?

Hi All,

I am trying to simplify how to frame my question so below is my code. I see
that BAR gets printed but not FOO and I am not sure why? my batch interval
is 1 second (something I pass in when I create a spark context). any idea?
I have bunch of events and I want to store the number of events where the
status == "Pending" every second (no prior state needed).

jsonMessagesDStream
        .filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String v1) throws Exception {
            System.out.println("****************FOO******************");
            JsonParser parser = new JsonParser();
            JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
            if (jsonObj != null && jsonObj.has("status")) {
                return
jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
            }
            return false;
        }
    }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> stringJavaRDD) throws Exception {
            System.out.println("*****************BAR******************");
            store(stringJavaRDD.count());
        }
    });