You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marco Platania <ma...@yahoo.it.INVALID> on 2016/06/11 00:34:01 UTC

Neither previous window has value for key, nor new values found

Hi all, 

I'm running a Spark Streaming application that uses reduceByKeyAndWindow(). The window interval is 2 hours, while the slide interval is 1 hour. I have a JavaPairRDD in which both keys and values are strings. Each time the reduceByKeyAndWindow() function is called, it uses appendString() and removeString() functions below to incrementally build a windowed stream of data: 

Function2<String, String, String> appendString = new Function2<String, String, String>() { 
      @Override 
      public String call(String s1, String s2) { 
        return s1 + s2; 
      } 
    }; 

    Function2<String, String, String> removeString = new Function2<String, String, String>() { 
      @Override 
      public String call(String s1, String s2) { 
        return s1.replace(s2, ""); 
      } 
    }; 

filterEmptyRecords() removes keys that eventually won't contain any value: 

    Function<scala.Tuple2<String, String>, Boolean> filterEmptyRecords = new Function<scala.Tuple2<String, String>, Boolean>() { 
      @Override 
      public Boolean call(scala.Tuple2<String, String> t) { 
        return (!t._2().isEmpty()); 
      } 
    }; 

The windowed operation is then: 

JavaPairDStream<String, String> cdr_kv = cdr_filtered.reduceByKeyAndWindow(appendString, removeString, Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION), PARTITIONS, filterEmptyRecords); 

After a few hours of operation, this function raises the following exception: 
"Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently?" 

I've found this post from 2013: https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
which however doesn't solve my problem. I'm using String to represent keys, which I'm pretty sure hash consistently. 

Any clue why this happens and possible suggestions to fix it? 

Thanks!