You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marco1982 <ma...@yahoo.it> on 2016/06/10 20:19:40 UTC

Neither previous window has value for key, nor new values found.

Hi all,

I'm running a Spark Streaming application that uses reduceByKeyAndWindow().
The window interval is 2 hours, while the slide interval is 1 hour. I have a
JavaPairRDD in which both keys and values are strings. Each time the
reduceByKeyAndWindow() function is called, it uses appendString() and
removeString() functions below to incrementally build a windowed stream of
data:

Function2<String, String, String> appendString = new Function2<String,
String, String>() {
      @Override
      public String call(String s1, String s2) {
        return s1 + s2;
      }
    };

    Function2<String, String, String> removeString = new Function2<String,
String, String>() {
      @Override
      public String call(String s1, String s2) {
        return s1.replace(s2, "");
      }
    };

filterEmptyRecords() removes keys that eventually won't contain any value:

    Function<scala.Tuple2&lt;String, String>, Boolean> filterEmptyRecords =
new Function<scala.Tuple2&lt;String, String>, Boolean>() {
      @Override
      public Boolean call(scala.Tuple2<String, String> t) {
        return (!t._2().isEmpty());
      }
    };

The windowed operation is then:

JavaPairDStream<String, String> cdr_kv =
cdr_filtered.reduceByKeyAndWindow(appendString, removeString,
Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION),
PARTITIONS, filterEmptyRecords);

After a few hours of operation, this function raises the following
exception:
"Neither previous window has value for key, nor new values found. Are you
sure your key class hashes consistently?"

I've found this post from 2013:
https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
which however doesn't solve my problem. I'm using String to represent keys,
which I'm pretty sure hash consistently.

Any clue why this happens and possible suggestions to fix it?

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Neither-previous-window-has-value-for-key-nor-new-values-found-tp27140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Neither previous window has value for key, nor new values found.

Posted by N B <nb...@gmail.com>.
That post from TD that you reference has a good explanation of the issue
you are encountering. The issue in my view here is that the reduce and the
inverseReduce function that you have specified are not perfect opposites of
each other. Consider the following strings:

"a"
"b"
"a"

forward reduce will form them into "aba".
When the first "a" falls off the window, your inverse reduce function will
produce "b" and not "ba" as would be required by the opposite of the
concatenation. Now when the second "a" falls off, the check for this being
inconsistent is triggered with the exception you see.

HTH
NB


On Fri, Jun 10, 2016 at 1:19 PM, Marco1982 <ma...@yahoo.it> wrote:

> Hi all,
>
> I'm running a Spark Streaming application that uses reduceByKeyAndWindow().
> The window interval is 2 hours, while the slide interval is 1 hour. I have
> a
> JavaPairRDD in which both keys and values are strings. Each time the
> reduceByKeyAndWindow() function is called, it uses appendString() and
> removeString() functions below to incrementally build a windowed stream of
> data:
>
> Function2<String, String, String> appendString = new Function2<String,
> String, String>() {
>       @Override
>       public String call(String s1, String s2) {
>         return s1 + s2;
>       }
>     };
>
>     Function2<String, String, String> removeString = new Function2<String,
> String, String>() {
>       @Override
>       public String call(String s1, String s2) {
>         return s1.replace(s2, "");
>       }
>     };
>
> filterEmptyRecords() removes keys that eventually won't contain any value:
>
>     Function<scala.Tuple2&lt;String, String>, Boolean> filterEmptyRecords =
> new Function<scala.Tuple2&lt;String, String>, Boolean>() {
>       @Override
>       public Boolean call(scala.Tuple2<String, String> t) {
>         return (!t._2().isEmpty());
>       }
>     };
>
> The windowed operation is then:
>
> JavaPairDStream<String, String> cdr_kv =
> cdr_filtered.reduceByKeyAndWindow(appendString, removeString,
> Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION),
> PARTITIONS, filterEmptyRecords);
>
> After a few hours of operation, this function raises the following
> exception:
> "Neither previous window has value for key, nor new values found. Are you
> sure your key class hashes consistently?"
>
> I've found this post from 2013:
> https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
> which however doesn't solve my problem. I'm using String to represent keys,
> which I'm pretty sure hash consistently.
>
> Any clue why this happens and possible suggestions to fix it?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Neither-previous-window-has-value-for-key-nor-new-values-found-tp27140.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>