You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Hao Ren <in...@gmail.com> on 2015/12/17 14:49:24 UTC

implict ClassTag in KafkaUtils

Hi,

I am reading spark streaming Kafka code.

In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to
create classTag.
However, they are all implicit. I don't understand why they are implicit.

In fact, I can not find any other overloaded "createDirectStream" take
implicit parameters.

So what are these implicit ClassTags are used for ? Thank you.

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    jssc: JavaStreamingContext,
    keyClass: Class[K],
    valueClass: Class[V],
    keyDecoderClass: Class[KD],
    valueDecoderClass: Class[VD],
    recordClass: Class[R],
    kafkaParams: JMap[String, String],
    fromOffsets: JMap[TopicAndPartition, JLong],
    messageHandler: JFunction[MessageAndMetadata[K, V], R]
  ): JavaInputDStream[R] = {
  implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
  implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
  implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
  implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
  implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
  val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
  createDirectStream[K, V, KD, VD, R](
    jssc.ssc,
    Map(kafkaParams.toSeq: _*),
    Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
    cleanedHandler
  )
}


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Re: implict ClassTag in KafkaUtils

Posted by Hao Ren <in...@gmail.com>.
Thank you for your quick answer.
It helped me to find an implicit conversion for JavaInputDStream which
takes implicit ClassTag.

Cheers.

On Thu, Dec 17, 2015 at 3:11 PM, Saisai Shao <sa...@gmail.com> wrote:

> Actually this is a Scala problem. createDirectStream actually requires
> implicit values, which is implied as context bound, Java does not have the
> equivalence, so here change the java class to the ClassTag, and make it as
> implicit value, it will be used by createDirectStream.
>
>
> Thanks
> Saisai
>
>
> On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <in...@gmail.com> wrote:
>
>> Hi,
>>
>> I am reading spark streaming Kafka code.
>>
>> In org.apache.spark.streaming.kafka.KafkaUtils file,
>> the function "createDirectStream" takes key class, value class, etc to
>> create classTag.
>> However, they are all implicit. I don't understand why they are implicit.
>>
>> In fact, I can not find any other overloaded "createDirectStream" take
>> implicit parameters.
>>
>> So what are these implicit ClassTags are used for ? Thank you.
>>
>> def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
>>     jssc: JavaStreamingContext,
>>     keyClass: Class[K],
>>     valueClass: Class[V],
>>     keyDecoderClass: Class[KD],
>>     valueDecoderClass: Class[VD],
>>     recordClass: Class[R],
>>     kafkaParams: JMap[String, String],
>>     fromOffsets: JMap[TopicAndPartition, JLong],
>>     messageHandler: JFunction[MessageAndMetadata[K, V], R]
>>   ): JavaInputDStream[R] = {
>>   implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
>>   implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
>>   implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
>>   implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
>>   implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
>>   val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
>>   createDirectStream[K, V, KD, VD, R](
>>     jssc.ssc,
>>     Map(kafkaParams.toSeq: _*),
>>     Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
>>     cleanedHandler
>>   )
>> }
>>
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Re: implict ClassTag in KafkaUtils

Posted by Saisai Shao <sa...@gmail.com>.
Actually this is a Scala problem. createDirectStream actually requires
implicit values, which is implied as context bound, Java does not have the
equivalence, so here change the java class to the ClassTag, and make it as
implicit value, it will be used by createDirectStream.


Thanks
Saisai


On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <in...@gmail.com> wrote:

> Hi,
>
> I am reading spark streaming Kafka code.
>
> In org.apache.spark.streaming.kafka.KafkaUtils file,
> the function "createDirectStream" takes key class, value class, etc to
> create classTag.
> However, they are all implicit. I don't understand why they are implicit.
>
> In fact, I can not find any other overloaded "createDirectStream" take
> implicit parameters.
>
> So what are these implicit ClassTags are used for ? Thank you.
>
> def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
>     jssc: JavaStreamingContext,
>     keyClass: Class[K],
>     valueClass: Class[V],
>     keyDecoderClass: Class[KD],
>     valueDecoderClass: Class[VD],
>     recordClass: Class[R],
>     kafkaParams: JMap[String, String],
>     fromOffsets: JMap[TopicAndPartition, JLong],
>     messageHandler: JFunction[MessageAndMetadata[K, V], R]
>   ): JavaInputDStream[R] = {
>   implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
>   implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
>   implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
>   implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
>   implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
>   val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
>   createDirectStream[K, V, KD, VD, R](
>     jssc.ssc,
>     Map(kafkaParams.toSeq: _*),
>     Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
>     cleanedHandler
>   )
> }
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>