You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by SRK <sw...@gmail.com> on 2017/06/06 18:30:56 UTC

Exception which using ReduceByKeyAndWindow in Spark Streaming.

Hi,

I see the following error when I use ReduceByKeyAndWindow in my Spark
Streaming app. I use reduce, invReduce and filterFunction as shown below.
Any idea as to why I get the error?

 java.lang.Exception: Neither previous window has value for key, nor new
values found. Are you sure your key class hashes consistently?


  def reduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String]))
=> (Long, HashSet[String])= {
    case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2 )

  }

  def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
HashSet[String])) => (Long, HashSet[String])= {
    case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
set1.diff(set2))
  }

  def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
(Boolean)= {
    case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
set.size>0
  }






--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWindow-in-Spark-Streaming-tp28748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by N B <nb...@gmail.com>.
Hi Swetha,

We have dealt with this issue a couple years ago and have solved it. The
key insight here was that adding to a HashSet and removing from a HashSet
are actually not inverse operations of each other.

For example, if you added a key K1 in batch1 and then again added that same
key K1 during a later batch lets say batch9, the inverse function needs to
be able to remove this key *twice* from the HashSet which is exactly not
what a HashSet does. Once the key is removed due to batch1 falling off, the
resulting new HashSet now has this key missing and when times comes to
remove bacth9, it will barf with the error that you are experiencing.

The solution is to actually maintain a count of how many times you have
encountered that particular key and care to decrement it in your invert
function. Once the count reaches 0, your filter function should then remove
that key from consideration.

We achieved it using a HashMap that maintains counts instead of a Set.

Hope this helps,
N B


On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy <swethakasireddy@gmail.com
> wrote:

> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this happens? I use scala.collection.immutable.List[String])  and my
> reduce and inverse reduce does the following.
>
> visitorSet1 ++visitorSet2
>
>
>
> visitorSet1.filterNot(visitorSet2.contains(_)
>
>
>
> On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <
> swethakasireddy@gmail.com> wrote:
>
>> I changed the datastructure to scala.collection.immutable.Set and I still
>> see the same issue. My key is a String.  I do the following in my reduce
>> and invReduce.
>>
>> visitorSet1 ++visitorSet2.toTraversable
>>
>>
>> visitorSet1 --visitorSet2.toTraversable
>>
>> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Yes, and in general any mutable data structure. You have to immutable
>>> data structures whose hashcode and equals is consistent enough for being
>>> put in a set.
>>>
>>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <sw...@gmail.com>
>>> wrote:
>>>
>>>> Are you suggesting against the usage of HashSet?
>>>>
>>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> This may be because of HashSet is a mutable data structure, and it
>>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>>> new HashMap in the function (and add both maps into it), rather than
>>>>> mutating one of them.
>>>>>
>>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>>> below.
>>>>>> Any idea as to why I get the error?
>>>>>>
>>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>>> new
>>>>>> values found. Are you sure your key class hashes consistently?
>>>>>>
>>>>>>
>>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String]))
>>>>>> => (Long, HashSet[String])= {
>>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>>> ++set2 )
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>>> set1.diff(set2))
>>>>>>   }
>>>>>>
>>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>>> (Boolean)= {
>>>>>>     case ((metricName:String, (timeStamp: Long, set:
>>>>>> HashSet[String]))) =>
>>>>>> set.size>0
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by Tathagata Das <ta...@gmail.com>.
Unfortunately, I am out of ideas. I dont know whats going wrong.
If you can, try using Structured Streaming. We are more active on the
Structured streaming project.

On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy <swethakasireddy@gmail.com
> wrote:

> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this happens? I use scala.collection.immutable.List[String])  and my
> reduce and inverse reduce does the following.
>
> visitorSet1 ++visitorSet2
>
>
>
> visitorSet1.filterNot(visitorSet2.contains(_)
>
>
>
> On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <
> swethakasireddy@gmail.com> wrote:
>
>> I changed the datastructure to scala.collection.immutable.Set and I still
>> see the same issue. My key is a String.  I do the following in my reduce
>> and invReduce.
>>
>> visitorSet1 ++visitorSet2.toTraversable
>>
>>
>> visitorSet1 --visitorSet2.toTraversable
>>
>> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Yes, and in general any mutable data structure. You have to immutable
>>> data structures whose hashcode and equals is consistent enough for being
>>> put in a set.
>>>
>>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <sw...@gmail.com>
>>> wrote:
>>>
>>>> Are you suggesting against the usage of HashSet?
>>>>
>>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> This may be because of HashSet is a mutable data structure, and it
>>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>>> new HashMap in the function (and add both maps into it), rather than
>>>>> mutating one of them.
>>>>>
>>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>>> below.
>>>>>> Any idea as to why I get the error?
>>>>>>
>>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>>> new
>>>>>> values found. Are you sure your key class hashes consistently?
>>>>>>
>>>>>>
>>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String]))
>>>>>> => (Long, HashSet[String])= {
>>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>>> ++set2 )
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>>> set1.diff(set2))
>>>>>>   }
>>>>>>
>>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>>> (Boolean)= {
>>>>>>     case ((metricName:String, (timeStamp: Long, set:
>>>>>> HashSet[String]))) =>
>>>>>> set.size>0
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by swetha kasireddy <sw...@gmail.com>.
Hi TD,

I am still seeing this issue with any immuatble DataStructure. Any idea why
this happens? I use scala.collection.immutable.List[String])  and my reduce
and inverse reduce does the following.

visitorSet1 ++visitorSet2



visitorSet1.filterNot(visitorSet2.contains(_)



On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <sw...@gmail.com>
wrote:

> I changed the datastructure to scala.collection.immutable.Set and I still
> see the same issue. My key is a String.  I do the following in my reduce
> and invReduce.
>
> visitorSet1 ++visitorSet2.toTraversable
>
>
> visitorSet1 --visitorSet2.toTraversable
>
> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1565@gmail.com
> > wrote:
>
>> Yes, and in general any mutable data structure. You have to immutable
>> data structures whose hashcode and equals is consistent enough for being
>> put in a set.
>>
>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <sw...@gmail.com>
>> wrote:
>>
>>> Are you suggesting against the usage of HashSet?
>>>
>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> This may be because of HashSet is a mutable data structure, and it
>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>> new HashMap in the function (and add both maps into it), rather than
>>>> mutating one of them.
>>>>
>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>> below.
>>>>> Any idea as to why I get the error?
>>>>>
>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>> new
>>>>> values found. Are you sure your key class hashes consistently?
>>>>>
>>>>>
>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String]))
>>>>> => (Long, HashSet[String])= {
>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>> ++set2 )
>>>>>
>>>>>   }
>>>>>
>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>> set1.diff(set2))
>>>>>   }
>>>>>
>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>> (Boolean)= {
>>>>>     case ((metricName:String, (timeStamp: Long, set:
>>>>> HashSet[String]))) =>
>>>>> set.size>0
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by swetha kasireddy <sw...@gmail.com>.
I changed the datastructure to scala.collection.immutable.Set and I still
see the same issue. My key is a String.  I do the following in my reduce
and invReduce.

visitorSet1 ++visitorSet2.toTraversable


visitorSet1 --visitorSet2.toTraversable

On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Yes, and in general any mutable data structure. You have to immutable data
> structures whose hashcode and equals is consistent enough for being put in
> a set.
>
> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <sw...@gmail.com>
> wrote:
>
>> Are you suggesting against the usage of HashSet?
>>
>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> This may be because of HashSet is a mutable data structure, and it seems
>>> you are actually mutating it in "set1 ++set2". I suggest creating a new
>>> HashMap in the function (and add both maps into it), rather than mutating
>>> one of them.
>>>
>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>> below.
>>>> Any idea as to why I get the error?
>>>>
>>>>  java.lang.Exception: Neither previous window has value for key, nor new
>>>> values found. Are you sure your key class hashes consistently?
>>>>
>>>>
>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String]))
>>>> => (Long, HashSet[String])= {
>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>> ++set2 )
>>>>
>>>>   }
>>>>
>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>> set1.diff(set2))
>>>>   }
>>>>
>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>> (Boolean)= {
>>>>     case ((metricName:String, (timeStamp: Long, set: HashSet[String])))
>>>> =>
>>>> set.size>0
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>> ndow-in-Spark-Streaming-tp28748.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by Tathagata Das <ta...@gmail.com>.
Yes, and in general any mutable data structure. You have to immutable data
structures whose hashcode and equals is consistent enough for being put in
a set.

On Jun 6, 2017 4:50 PM, "swetha kasireddy" <sw...@gmail.com>
wrote:

> Are you suggesting against the usage of HashSet?
>
> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <tathagata.das1565@gmail.com
> > wrote:
>
>> This may be because of HashSet is a mutable data structure, and it seems
>> you are actually mutating it in "set1 ++set2". I suggest creating a new
>> HashMap in the function (and add both maps into it), rather than mutating
>> one of them.
>>
>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>> Streaming app. I use reduce, invReduce and filterFunction as shown below.
>>> Any idea as to why I get the error?
>>>
>>>  java.lang.Exception: Neither previous window has value for key, nor new
>>> values found. Are you sure your key class hashes consistently?
>>>
>>>
>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>> HashSet[String]))
>>> => (Long, HashSet[String])= {
>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>> ++set2 )
>>>
>>>   }
>>>
>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>> HashSet[String])) => (Long, HashSet[String])= {
>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>> set1.diff(set2))
>>>   }
>>>
>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>> (Boolean)= {
>>>     case ((metricName:String, (timeStamp: Long, set: HashSet[String])))
>>> =>
>>> set.size>0
>>>   }
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>> ndow-in-Spark-Streaming-tp28748.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by swetha kasireddy <sw...@gmail.com>.
Are you suggesting against the usage of HashSet?

On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <ta...@gmail.com>
wrote:

> This may be because of HashSet is a mutable data structure, and it seems
> you are actually mutating it in "set1 ++set2". I suggest creating a new
> HashMap in the function (and add both maps into it), rather than mutating
> one of them.
>
> On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com> wrote:
>
>> Hi,
>>
>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>> Streaming app. I use reduce, invReduce and filterFunction as shown below.
>> Any idea as to why I get the error?
>>
>>  java.lang.Exception: Neither previous window has value for key, nor new
>> values found. Are you sure your key class hashes consistently?
>>
>>
>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String]))
>> => (Long, HashSet[String])= {
>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2
>> )
>>
>>   }
>>
>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String])) => (Long, HashSet[String])= {
>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>> set1.diff(set2))
>>   }
>>
>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>> (Boolean)= {
>>     case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
>> set.size>0
>>   }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>> ndow-in-Spark-Streaming-tp28748.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

Posted by Tathagata Das <ta...@gmail.com>.
This may be because of HashSet is a mutable data structure, and it seems
you are actually mutating it in "set1 ++set2". I suggest creating a new
HashMap in the function (and add both maps into it), rather than mutating
one of them.

On Tue, Jun 6, 2017 at 11:30 AM, SRK <sw...@gmail.com> wrote:

> Hi,
>
> I see the following error when I use ReduceByKeyAndWindow in my Spark
> Streaming app. I use reduce, invReduce and filterFunction as shown below.
> Any idea as to why I get the error?
>
>  java.lang.Exception: Neither previous window has value for key, nor new
> values found. Are you sure your key class hashes consistently?
>
>
>   def reduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String]))
> => (Long, HashSet[String])= {
>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2 )
>
>   }
>
>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
> HashSet[String])) => (Long, HashSet[String])= {
>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
> set1.diff(set2))
>   }
>
>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
> (Boolean)= {
>     case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
> set.size>0
>   }
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWindow-in-Spark-
> Streaming-tp28748.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>