You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by spr <sp...@yarcdata.com> on 2014/11/12 01:26:13 UTC

"overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

I am creating a workflow;  I have an existing call to updateStateByKey that
works fine, but when I created a second use where the key is a Tuple2, it's
now failing with the dreaded "overloaded method value updateStateByKey  with
alternatives ... cannot be applied to ..."  Comparing the two uses I'm not
seeing anything that seems broken, though I do note that all the messages
below describe what the code provides as Time as
org.apache.spark.streaming.Time. 

a) Could the Time v org.apache.spark.streaming.Time difference be causing
this?  (I'm using Time the same in the first use, which appears to work
properly.) 

b) Any suggestions of what else could be causing the error?   

------code-------- 
    val ssc = new StreamingContext(conf, Seconds(timeSliceArg)) 
    ssc.checkpoint(".") 

    var lines = ssc.textFileStream(dirArg) 

    var linesArray = lines.map( line => (line.split("\t"))) 
    var DnsSvr = linesArray.map( lineArray => ( 
         (lineArray(4), lineArray(5)), 
         (1 , Time((lineArray(0).toDouble*1000).toLong) ))  ) 

    val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int,
Time)]) => { 
      val currentCount = if (values.isEmpty) 0 else values.map( x =>
x._1).sum 
      val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
values.map( x => x._2).min 

      val (previousCount, minTime) = state.getOrElse((0,
Time(System.currentTimeMillis))) 

      (currentCount + previousCount, Seq(minTime, newMinTime).min) 
    } 

    var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount)  
// <=== error here 


------compilation output---------- 
[error] /Users/spr/Documents/.../cyberSCinet.scala:513: overloaded method
value updateStateByKey with alternatives: 
[error]   (updateFunc: Iterator[((String, String), Seq[(Int,
org.apache.spark.streaming.Time)], Option[(Int,
org.apache.spark.streaming.Time)])] => Iterator[((String, String), (Int,
org.apache.spark.streaming.Time))],partitioner:
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit
evidence$5: scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)],partitioner:
org.apache.spark.Partitioner)(implicit evidence$4:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)],numPartitions: Int)(implicit evidence$3:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)])(implicit evidence$2:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] 
[error]  cannot be applied to ((Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => (Int,
org.apache.spark.streaming.Time)) 
[error]     var DnsSvrCum = DnsSvr.updateStateByKey[(Int,
Time)](updateDnsCount) 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Posted by Steve Reinhardt <sp...@yarcdata.com>.
I'm missing something simpler (I think).  That is, why do I need a Some instead of Tuple2?  Because a Some might or might not be there, but a Tuple2 must be there?  Or something like that?

From: Adrian Mocanu <am...@verticalscope.com>>

You are correct; the filtering I’m talking about is done implicitly. You don’t have to do it yourself. Spark will do it for you and remove those entries from the state collection.

From: Yana Kadiyska [mailto:yana.kadiyska@gmail.com]

Adrian, do you know if this is documented somewhere? I was also under the impression that setting a key's value to None would cause the key to be discarded (without any explicit filtering on the user's part) but can not find any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu <am...@verticalscope.com>> wrote:
My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever.

-----Original Message-----
From: spr

After comparing with previous code, I got it work by making the return a Some instead of Tuple2.  Perhaps some day I will understand this.


RE: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Posted by Adrian Mocanu <am...@verticalscope.com>.
You are correct; the filtering I’m talking about is done implicitly. You don’t have to do it yourself. Spark will do it for you and remove those entries from the state collection.

From: Yana Kadiyska [mailto:yana.kadiyska@gmail.com]
Sent: November-12-14 3:50 PM
To: Adrian Mocanu
Cc: spr; user@spark.incubator.apache.org
Subject: Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Adrian, do you know if this is documented somewhere? I was also under the impression that setting a key's value to None would cause the key to be discarded (without any explicit filtering on the user's part) but can not find any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu <am...@verticalscope.com>> wrote:
My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever.

-----Original Message-----
From: spr [mailto:spr@yarcdata.com<ma...@yarcdata.com>]
Sent: November-12-14 2:25 PM
To: user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>
Subject: Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

After comparing with previous code, I got it work by making the return a Some instead of Tuple2.  Perhaps some day I will understand this.


spr wrote
> ------code--------
>
>     val updateDnsCount = (values: Seq[(Int, Time)], state:
> Option[(Int,
> Time)]) => {
>       val currentCount = if (values.isEmpty) 0 else values.map( x =>
> x._1).sum
>       val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
> values.map( x => x._2).min
>
>       val (previousCount, minTime) = state.getOrElse((0,
> Time(System.currentTimeMillis)))
>
>       //  (currentCount + previousCount, Seq(minTime, newMinTime).min)
> <== old
>       Some(currentCount + previousCount, Seq(minTime, newMinTime).min)
> // <== new
>     }





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org> For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>


Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Posted by Yana Kadiyska <ya...@gmail.com>.
Adrian, do you know if this is documented somewhere? I was also under the
impression that setting a key's value to None would cause the key to be
discarded (without any explicit filtering on the user's part) but can not
find any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu <am...@verticalscope.com>
wrote:

> My understanding is that the reason you have an Option is so you could
> filter out tuples when None is returned. This way your state data won't
> grow forever.
>
> -----Original Message-----
> From: spr [mailto:spr@yarcdata.com]
> Sent: November-12-14 2:25 PM
> To: user@spark.incubator.apache.org
> Subject: Re: "overloaded method value updateStateByKey ... cannot be
> applied to ..." when Key is a Tuple2
>
> After comparing with previous code, I got it work by making the return a
> Some instead of Tuple2.  Perhaps some day I will understand this.
>
>
> spr wrote
> > ------code--------
> >
> >     val updateDnsCount = (values: Seq[(Int, Time)], state:
> > Option[(Int,
> > Time)]) => {
> >       val currentCount = if (values.isEmpty) 0 else values.map( x =>
> > x._1).sum
> >       val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
> > values.map( x => x._2).min
> >
> >       val (previousCount, minTime) = state.getOrElse((0,
> > Time(System.currentTimeMillis)))
> >
> >       //  (currentCount + previousCount, Seq(minTime, newMinTime).min)
> > <== old
> >       Some(currentCount + previousCount, Seq(minTime, newMinTime).min)
> > // <== new
> >     }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
> commands, e-mail: user-help@spark.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

RE: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Posted by Adrian Mocanu <am...@verticalscope.com>.
My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever.

-----Original Message-----
From: spr [mailto:spr@yarcdata.com] 
Sent: November-12-14 2:25 PM
To: user@spark.incubator.apache.org
Subject: Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

After comparing with previous code, I got it work by making the return a Some instead of Tuple2.  Perhaps some day I will understand this.


spr wrote
> ------code--------
> 
>     val updateDnsCount = (values: Seq[(Int, Time)], state: 
> Option[(Int,
> Time)]) => { 
>       val currentCount = if (values.isEmpty) 0 else values.map( x => 
> x._1).sum
>       val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else 
> values.map( x => x._2).min
> 
>       val (previousCount, minTime) = state.getOrElse((0,
> Time(System.currentTimeMillis)))
> 
>       //  (currentCount + previousCount, Seq(minTime, newMinTime).min)    
> <== old
>       Some(currentCount + previousCount, Seq(minTime, newMinTime).min) 
> // <== new
>     }





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2

Posted by spr <sp...@yarcdata.com>.
After comparing with previous code, I got it work by making the return a Some
instead of Tuple2.  Perhaps some day I will understand this.


spr wrote
> ------code-------- 
> 
>     val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int,
> Time)]) => { 
>       val currentCount = if (values.isEmpty) 0 else values.map( x =>
> x._1).sum 
>       val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
> values.map( x => x._2).min 
> 
>       val (previousCount, minTime) = state.getOrElse((0,
> Time(System.currentTimeMillis))) 
> 
>       //  (currentCount + previousCount, Seq(minTime, newMinTime).min)    
> <== old
>       Some(currentCount + previousCount, Seq(minTime, newMinTime).min)  
> // <== new
>     } 





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org