You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2016/11/16 19:16:00 UTC
Spark 2.0.2 with Kafka source, Error please help!
I am using protobuf to encode. This may not be related to the new release
issue....
Exception in thread "main" scala.ScalaReflectionException: <none> is not a
term
at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
at
scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811)
at
org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800)
at
org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.immutable.List.flatMap(List.scala:344)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583)
at
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
at
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)
at PersonConsumer$.main(PersonConsumer.scala:33)
at PersonConsumer.main(PersonConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
The following is my code ...
object PersonConsumer {
import org.apache.spark.rdd.RDD
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
def parseLine(s: String): Person =
Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
val ds4 = spark.sqlContext.sql("select name from persons")
val query = ds4.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by shyla deshpande <de...@gmail.com>.
Thanks Zhu, That was it. Now works great!
On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:
> The problem is "optional Gender gender = 3;". The generated class "Gender"
> is a trait, and Spark cannot know how to create a trait so it's not
> supported. You can define your class which is supported by SQL Encoder, and
> convert this generated class to the new class in `parseLine`.
>
> On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> Ryan,
>>
>> I just wanted to provide more info. Here is my .proto file which is the
>> basis for generating the Person class. Thanks.
>>
>> option java_package = "com.example.protos";
>> enum Gender {
>> MALE = 1;
>> FEMALE = 2;
>> }
>> message Address {
>> optional string street = 1;
>> optional string city = 2;
>> }
>> message Person {
>> optional string name = 1;
>> optional int32 age = 2;
>> optional Gender gender = 3;
>> repeated string tags = 4;
>> repeated Address addresses = 5;
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <
>> deshpandeshyla@gmail.com> wrote:
>>
>>> *Thanks for the response. Following is the Person class..*
>>>
>>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>>> // Do not edit!
>>> //
>>> // Protofile syntax: PROTO2
>>>
>>> package com.example.protos.demo
>>>
>>>
>>>
>>> @SerialVersionUID(0L)
>>> final case class Person(
>>> name: scala.Option[String] = None,
>>> age: scala.Option[Int] = None,
>>> gender: scala.Option[com.example.protos.demo.Gender] = None,
>>> tags: scala.collection.Seq[String] = Nil,
>>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>>> ) extends com.trueaccord.scalapb.GeneratedMessage with com.trueaccord.scalapb.Message[Person] with com.trueaccord.lenses.Updatable[Person] {
>>> @transient
>>> private[this] var __serializedSizeCachedValue: Int = 0
>>> private[this] def __computeSerializedValue(): Int = {
>>> var __size = 0
>>> if (name.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>> if (age.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>> if (gender.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>> tags.foreach(tags => __size += com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>> addresses.foreach(addresses => __size += 1 + com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) + addresses.serializedSize)
>>> __size
>>> }
>>> final override def serializedSize: Int = {
>>> var read = __serializedSizeCachedValue
>>> if (read == 0) {
>>> read = __computeSerializedValue()
>>> __serializedSizeCachedValue = read
>>> }
>>> read
>>> }
>>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>>> name.foreach { __v =>
>>> _output__.writeString(1, __v)
>>> };
>>> age.foreach { __v =>
>>> _output__.writeInt32(2, __v)
>>> };
>>> gender.foreach { __v =>
>>> _output__.writeEnum(3, __v.value)
>>> };
>>> tags.foreach { __v =>
>>> _output__.writeString(4, __v)
>>> };
>>> addresses.foreach { __v =>
>>> _output__.writeTag(5, 2)
>>> _output__.writeUInt32NoTag(__v.serializedSize)
>>> __v.writeTo(_output__)
>>> };
>>> }
>>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): com.example.protos.demo.Person = {
>>> var __name = this.name
>>> var __age = this.age
>>> var __gender = this.gender
>>> val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= this.tags)
>>> val __addresses = (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] ++= this.addresses)
>>> var _done__ = false
>>> while (!_done__) {
>>> val _tag__ = _input__.readTag()
>>> _tag__ match {
>>> case 0 => _done__ = true
>>> case 10 =>
>>> __name = Some(_input__.readString())
>>> case 16 =>
>>> __age = Some(_input__.readInt32())
>>> case 24 =>
>>> __gender = Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>>> case 34 =>
>>> __tags += _input__.readString()
>>> case 42 =>
>>> __addresses += com.trueaccord.scalapb.LiteParser.readMessage(_input__, com.example.protos.demo.Address.defaultInstance)
>>> case tag => _input__.skipField(tag)
>>> }
>>> }
>>> com.example.protos.demo.Person(
>>> name = __name,
>>> age = __age,
>>> gender = __gender,
>>> tags = __tags.result(),
>>> addresses = __addresses.result()
>>> )
>>> }
>>> def getName: String = name.getOrElse("")
>>> def clearName: Person = copy(name = None)
>>> def withName(__v: String): Person = copy(name = Some(__v))
>>> def getAge: Int = age.getOrElse(0)
>>> def clearAge: Person = copy(age = None)
>>> def withAge(__v: Int): Person = copy(age = Some(__v))
>>> def getGender: com.example.protos.demo.Gender = gender.getOrElse(com.example.protos.demo.Gender.MALE)
>>> def clearGender: Person = copy(gender = None)
>>> def withGender(__v: com.example.protos.demo.Gender): Person = copy(gender = Some(__v))
>>> def clearTags = copy(tags = scala.collection.Seq.empty)
>>> def addTags(__vs: String*): Person = addAllTags(__vs)
>>> def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags ++ __vs)
>>> def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v)
>>> def clearAddresses = copy(addresses = scala.collection.Seq.empty)
>>> def addAddresses(__vs: com.example.protos.demo.Address*): Person = addAllAddresses(__vs)
>>> def addAllAddresses(__vs: TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = addresses ++ __vs)
>>> def withAddresses(__v: scala.collection.Seq[com.example.protos.demo.Address]): Person = copy(addresses = __v)
>>> def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): scala.Any = {
>>> __field.getNumber match {
>>> case 1 => name.getOrElse(null)
>>> case 2 => age.getOrElse(null)
>>> case 3 => gender.map(_.valueDescriptor).getOrElse(null)
>>> case 4 => tags
>>> case 5 => addresses
>>> }
>>> }
>>> override def toString: String = com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
>>> def companion = com.example.protos.demo.Person
>>> }
>>>
>>> object Person extends com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] {
>>> implicit def messageCompanion: com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] = this
>>> def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): com.example.protos.demo.Person = {
>>> require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), "FieldDescriptor does not match message type.")
>>> val __fields = descriptor.getFields
>>> com.example.protos.demo.Person(
>>> __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
>>> __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
>>> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e => com.example.protos.demo.Gender.fromValue(__e.getNumber)),
>>> __fieldsMap.getOrElse(__fields.get(3), Nil).asInstanceOf[scala.collection.Seq[String]],
>>> __fieldsMap.getOrElse(__fields.get(4), Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
>>> )
>>> }
>>> def descriptor: com.google.protobuf.Descriptors.Descriptor = DemoProto.descriptor.getMessageTypes.get(1)
>>> def messageCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
>>> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
>>> var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
>>> __field.getNumber match {
>>> case 5 => __out = com.example.protos.demo.Address
>>> }
>>> __out
>>> }
>>> def enumCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
>>> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
>>> __field.getNumber match {
>>> case 3 => com.example.protos.demo.Gender
>>> }
>>> }
>>> lazy val defaultInstance = com.example.protos.demo.Person(
>>> )
>>> implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person]) extends com.trueaccord.lenses.ObjectLens[UpperPB, com.example.protos.demo.Person](_l) {
>>> def name: com.trueaccord.lenses.Lens[UpperPB, String] = field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
>>> def optionalName: com.trueaccord.lenses.Lens[UpperPB, scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
>>> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
>>> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] = field(_.age)((c_, f_) => c_.copy(age = f_))
>>> def gender: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => c_.copy(gender = Some(f_)))
>>> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => c_.copy(gender = f_))
>>> def tags: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_))
>>> def addresses: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[com.example.protos.demo.Address]] = field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
>>> }
>>> final val NAME_FIELD_NUMBER = 1
>>> final val AGE_FIELD_NUMBER = 2
>>> final val GENDER_FIELD_NUMBER = 3
>>> final val TAGS_FIELD_NUMBER = 4
>>> final val ADDRESSES_FIELD_NUMBER = 5
>>> }
>>>
>>>
>>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Could you provide the Person class?
>>>>
>>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>>>> deshpandeshyla@gmail.com> wrote:
>>>>
>>>>> I am using 2.11.8. Thanks
>>>>>
>>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>>>> some known race conditions in reflection and the Scala community doesn't
>>>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>>>> is upgrading to Scala 2.11.
>>>>>>
>>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>>>> deshpandeshyla@gmail.com> wrote:
>>>>>>
>>>>>>> I am using protobuf to encode. This may not be related to the new
>>>>>>> release issue....
>>>>>>>
>>>>>>> Exception in thread "main" scala.ScalaReflectionException: <none>
>>>>>>> is not a term
>>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca
>>>>>>> la:199)
>>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>>>> ymbols.scala:84)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>>>> tParams(ScalaReflection.scala:811)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>>>> ms(ScalaReflection.scala:39)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>>>> rParameters(ScalaReflection.scala:39)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:582)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:460)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>>> ly(ScalaReflection.scala:592)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>>> ly(ScalaReflection.scala:583)
>>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>>> aversableLike.scala:252)
>>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>>> aversableLike.scala:252)
>>>>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>>>> ke.scala:252)
>>>>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:583)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>>>>> (ScalaReflection.scala:425)
>>>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>>>>> ply(ExpressionEncoder.scala:61)
>>>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>>>>> cits.scala:47)
>>>>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>> ssorImpl.java:62)
>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>> thodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j
>>>>>>> ava:147)
>>>>>>>
>>>>>>> The following is my code ...
>>>>>>>
>>>>>>> object PersonConsumer {
>>>>>>> import org.apache.spark.rdd.RDD
>>>>>>> import com.trueaccord.scalapb.spark._
>>>>>>> import org.apache.spark.sql.{SQLContext, SparkSession}
>>>>>>> import com.example.protos.demo._
>>>>>>>
>>>>>>> def main(args : Array[String]) {
>>>>>>>
>>>>>>> def parseLine(s: String): Person =
>>>>>>> Person.parseFrom(
>>>>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>>>>
>>>>>>> val spark = SparkSession.builder.
>>>>>>> master("local")
>>>>>>> .appName("spark session example")
>>>>>>> .getOrCreate()
>>>>>>>
>>>>>>> import spark.implicits._
>>>>>>>
>>>>>>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>>>>
>>>>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>>>>
>>>>>>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>>>>>>
>>>>>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>>>>>
>>>>>>> val query = ds4.writeStream
>>>>>>> .outputMode("append")
>>>>>>> .format("console")
>>>>>>> .start()
>>>>>>> query.awaitTermination()
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
The problem is "optional Gender gender = 3;". The generated class "Gender"
is a trait, and Spark cannot know how to create a trait so it's not
supported. You can define your class which is supported by SQL Encoder, and
convert this generated class to the new class in `parseLine`.
On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <de...@gmail.com>
wrote:
> Ryan,
>
> I just wanted to provide more info. Here is my .proto file which is the
> basis for generating the Person class. Thanks.
>
> option java_package = "com.example.protos";
> enum Gender {
> MALE = 1;
> FEMALE = 2;
> }
> message Address {
> optional string street = 1;
> optional string city = 2;
> }
> message Person {
> optional string name = 1;
> optional int32 age = 2;
> optional Gender gender = 3;
> repeated string tags = 4;
> repeated Address addresses = 5;
> }
>
>
> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> *Thanks for the response. Following is the Person class..*
>>
>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>> // Do not edit!
>> //
>> // Protofile syntax: PROTO2
>>
>> package com.example.protos.demo
>>
>>
>>
>> @SerialVersionUID(0L)
>> final case class Person(
>> name: scala.Option[String] = None,
>> age: scala.Option[Int] = None,
>> gender: scala.Option[com.example.protos.demo.Gender] = None,
>> tags: scala.collection.Seq[String] = Nil,
>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>> ) extends com.trueaccord.scalapb.GeneratedMessage with com.trueaccord.scalapb.Message[Person] with com.trueaccord.lenses.Updatable[Person] {
>> @transient
>> private[this] var __serializedSizeCachedValue: Int = 0
>> private[this] def __computeSerializedValue(): Int = {
>> var __size = 0
>> if (name.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>> if (age.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>> if (gender.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>> tags.foreach(tags => __size += com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>> addresses.foreach(addresses => __size += 1 + com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) + addresses.serializedSize)
>> __size
>> }
>> final override def serializedSize: Int = {
>> var read = __serializedSizeCachedValue
>> if (read == 0) {
>> read = __computeSerializedValue()
>> __serializedSizeCachedValue = read
>> }
>> read
>> }
>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>> name.foreach { __v =>
>> _output__.writeString(1, __v)
>> };
>> age.foreach { __v =>
>> _output__.writeInt32(2, __v)
>> };
>> gender.foreach { __v =>
>> _output__.writeEnum(3, __v.value)
>> };
>> tags.foreach { __v =>
>> _output__.writeString(4, __v)
>> };
>> addresses.foreach { __v =>
>> _output__.writeTag(5, 2)
>> _output__.writeUInt32NoTag(__v.serializedSize)
>> __v.writeTo(_output__)
>> };
>> }
>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): com.example.protos.demo.Person = {
>> var __name = this.name
>> var __age = this.age
>> var __gender = this.gender
>> val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= this.tags)
>> val __addresses = (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] ++= this.addresses)
>> var _done__ = false
>> while (!_done__) {
>> val _tag__ = _input__.readTag()
>> _tag__ match {
>> case 0 => _done__ = true
>> case 10 =>
>> __name = Some(_input__.readString())
>> case 16 =>
>> __age = Some(_input__.readInt32())
>> case 24 =>
>> __gender = Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>> case 34 =>
>> __tags += _input__.readString()
>> case 42 =>
>> __addresses += com.trueaccord.scalapb.LiteParser.readMessage(_input__, com.example.protos.demo.Address.defaultInstance)
>> case tag => _input__.skipField(tag)
>> }
>> }
>> com.example.protos.demo.Person(
>> name = __name,
>> age = __age,
>> gender = __gender,
>> tags = __tags.result(),
>> addresses = __addresses.result()
>> )
>> }
>> def getName: String = name.getOrElse("")
>> def clearName: Person = copy(name = None)
>> def withName(__v: String): Person = copy(name = Some(__v))
>> def getAge: Int = age.getOrElse(0)
>> def clearAge: Person = copy(age = None)
>> def withAge(__v: Int): Person = copy(age = Some(__v))
>> def getGender: com.example.protos.demo.Gender = gender.getOrElse(com.example.protos.demo.Gender.MALE)
>> def clearGender: Person = copy(gender = None)
>> def withGender(__v: com.example.protos.demo.Gender): Person = copy(gender = Some(__v))
>> def clearTags = copy(tags = scala.collection.Seq.empty)
>> def addTags(__vs: String*): Person = addAllTags(__vs)
>> def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags ++ __vs)
>> def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v)
>> def clearAddresses = copy(addresses = scala.collection.Seq.empty)
>> def addAddresses(__vs: com.example.protos.demo.Address*): Person = addAllAddresses(__vs)
>> def addAllAddresses(__vs: TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = addresses ++ __vs)
>> def withAddresses(__v: scala.collection.Seq[com.example.protos.demo.Address]): Person = copy(addresses = __v)
>> def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): scala.Any = {
>> __field.getNumber match {
>> case 1 => name.getOrElse(null)
>> case 2 => age.getOrElse(null)
>> case 3 => gender.map(_.valueDescriptor).getOrElse(null)
>> case 4 => tags
>> case 5 => addresses
>> }
>> }
>> override def toString: String = com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
>> def companion = com.example.protos.demo.Person
>> }
>>
>> object Person extends com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] {
>> implicit def messageCompanion: com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] = this
>> def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): com.example.protos.demo.Person = {
>> require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), "FieldDescriptor does not match message type.")
>> val __fields = descriptor.getFields
>> com.example.protos.demo.Person(
>> __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
>> __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
>> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e => com.example.protos.demo.Gender.fromValue(__e.getNumber)),
>> __fieldsMap.getOrElse(__fields.get(3), Nil).asInstanceOf[scala.collection.Seq[String]],
>> __fieldsMap.getOrElse(__fields.get(4), Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
>> )
>> }
>> def descriptor: com.google.protobuf.Descriptors.Descriptor = DemoProto.descriptor.getMessageTypes.get(1)
>> def messageCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
>> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
>> var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
>> __field.getNumber match {
>> case 5 => __out = com.example.protos.demo.Address
>> }
>> __out
>> }
>> def enumCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
>> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
>> __field.getNumber match {
>> case 3 => com.example.protos.demo.Gender
>> }
>> }
>> lazy val defaultInstance = com.example.protos.demo.Person(
>> )
>> implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person]) extends com.trueaccord.lenses.ObjectLens[UpperPB, com.example.protos.demo.Person](_l) {
>> def name: com.trueaccord.lenses.Lens[UpperPB, String] = field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
>> def optionalName: com.trueaccord.lenses.Lens[UpperPB, scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
>> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
>> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] = field(_.age)((c_, f_) => c_.copy(age = f_))
>> def gender: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => c_.copy(gender = Some(f_)))
>> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => c_.copy(gender = f_))
>> def tags: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_))
>> def addresses: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[com.example.protos.demo.Address]] = field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
>> }
>> final val NAME_FIELD_NUMBER = 1
>> final val AGE_FIELD_NUMBER = 2
>> final val GENDER_FIELD_NUMBER = 3
>> final val TAGS_FIELD_NUMBER = 4
>> final val ADDRESSES_FIELD_NUMBER = 5
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Could you provide the Person class?
>>>
>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>>> deshpandeshyla@gmail.com> wrote:
>>>
>>>> I am using 2.11.8. Thanks
>>>>
>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>>> some known race conditions in reflection and the Scala community doesn't
>>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>>> is upgrading to Scala 2.11.
>>>>>
>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>>> deshpandeshyla@gmail.com> wrote:
>>>>>
>>>>>> I am using protobuf to encode. This may not be related to the new
>>>>>> release issue....
>>>>>>
>>>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>>>>> not a term
>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca
>>>>>> la:199)
>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>>> ymbols.scala:84)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>>> tParams(ScalaReflection.scala:811)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>>> ms(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>>> rParameters(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:582)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:460)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:592)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:583)
>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>> aversableLike.scala:252)
>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>> aversableLike.scala:252)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>>> ke.scala:252)
>>>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:583)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>>>> (ScalaReflection.scala:425)
>>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>>>> ply(ExpressionEncoder.scala:61)
>>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>>>> cits.scala:47)
>>>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>> ssorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>> thodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j
>>>>>> ava:147)
>>>>>>
>>>>>> The following is my code ...
>>>>>>
>>>>>> object PersonConsumer {
>>>>>> import org.apache.spark.rdd.RDD
>>>>>> import com.trueaccord.scalapb.spark._
>>>>>> import org.apache.spark.sql.{SQLContext, SparkSession}
>>>>>> import com.example.protos.demo._
>>>>>>
>>>>>> def main(args : Array[String]) {
>>>>>>
>>>>>> def parseLine(s: String): Person =
>>>>>> Person.parseFrom(
>>>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>>>
>>>>>> val spark = SparkSession.builder.
>>>>>> master("local")
>>>>>> .appName("spark session example")
>>>>>> .getOrCreate()
>>>>>>
>>>>>> import spark.implicits._
>>>>>>
>>>>>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>>>
>>>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>>>
>>>>>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>>>>>
>>>>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>>>>
>>>>>> val query = ds4.writeStream
>>>>>> .outputMode("append")
>>>>>> .format("console")
>>>>>> .start()
>>>>>> query.awaitTermination()
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by shyla deshpande <de...@gmail.com>.
Ryan,
I just wanted to provide more info. Here is my .proto file which is the
basis for generating the Person class. Thanks.
option java_package = "com.example.protos";
enum Gender {
MALE = 1;
FEMALE = 2;
}
message Address {
optional string street = 1;
optional string city = 2;
}
message Person {
optional string name = 1;
optional int32 age = 2;
optional Gender gender = 3;
repeated string tags = 4;
repeated Address addresses = 5;
}
On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <de...@gmail.com>
wrote:
> *Thanks for the response. Following is the Person class..*
>
> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
> // Do not edit!
> //
> // Protofile syntax: PROTO2
>
> package com.example.protos.demo
>
>
>
> @SerialVersionUID(0L)
> final case class Person(
> name: scala.Option[String] = None,
> age: scala.Option[Int] = None,
> gender: scala.Option[com.example.protos.demo.Gender] = None,
> tags: scala.collection.Seq[String] = Nil,
> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
> ) extends com.trueaccord.scalapb.GeneratedMessage with com.trueaccord.scalapb.Message[Person] with com.trueaccord.lenses.Updatable[Person] {
> @transient
> private[this] var __serializedSizeCachedValue: Int = 0
> private[this] def __computeSerializedValue(): Int = {
> var __size = 0
> if (name.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
> if (age.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
> if (gender.isDefined) { __size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
> tags.foreach(tags => __size += com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
> addresses.foreach(addresses => __size += 1 + com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) + addresses.serializedSize)
> __size
> }
> final override def serializedSize: Int = {
> var read = __serializedSizeCachedValue
> if (read == 0) {
> read = __computeSerializedValue()
> __serializedSizeCachedValue = read
> }
> read
> }
> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
> name.foreach { __v =>
> _output__.writeString(1, __v)
> };
> age.foreach { __v =>
> _output__.writeInt32(2, __v)
> };
> gender.foreach { __v =>
> _output__.writeEnum(3, __v.value)
> };
> tags.foreach { __v =>
> _output__.writeString(4, __v)
> };
> addresses.foreach { __v =>
> _output__.writeTag(5, 2)
> _output__.writeUInt32NoTag(__v.serializedSize)
> __v.writeTo(_output__)
> };
> }
> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): com.example.protos.demo.Person = {
> var __name = this.name
> var __age = this.age
> var __gender = this.gender
> val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= this.tags)
> val __addresses = (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] ++= this.addresses)
> var _done__ = false
> while (!_done__) {
> val _tag__ = _input__.readTag()
> _tag__ match {
> case 0 => _done__ = true
> case 10 =>
> __name = Some(_input__.readString())
> case 16 =>
> __age = Some(_input__.readInt32())
> case 24 =>
> __gender = Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
> case 34 =>
> __tags += _input__.readString()
> case 42 =>
> __addresses += com.trueaccord.scalapb.LiteParser.readMessage(_input__, com.example.protos.demo.Address.defaultInstance)
> case tag => _input__.skipField(tag)
> }
> }
> com.example.protos.demo.Person(
> name = __name,
> age = __age,
> gender = __gender,
> tags = __tags.result(),
> addresses = __addresses.result()
> )
> }
> def getName: String = name.getOrElse("")
> def clearName: Person = copy(name = None)
> def withName(__v: String): Person = copy(name = Some(__v))
> def getAge: Int = age.getOrElse(0)
> def clearAge: Person = copy(age = None)
> def withAge(__v: Int): Person = copy(age = Some(__v))
> def getGender: com.example.protos.demo.Gender = gender.getOrElse(com.example.protos.demo.Gender.MALE)
> def clearGender: Person = copy(gender = None)
> def withGender(__v: com.example.protos.demo.Gender): Person = copy(gender = Some(__v))
> def clearTags = copy(tags = scala.collection.Seq.empty)
> def addTags(__vs: String*): Person = addAllTags(__vs)
> def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags ++ __vs)
> def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v)
> def clearAddresses = copy(addresses = scala.collection.Seq.empty)
> def addAddresses(__vs: com.example.protos.demo.Address*): Person = addAllAddresses(__vs)
> def addAllAddresses(__vs: TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = addresses ++ __vs)
> def withAddresses(__v: scala.collection.Seq[com.example.protos.demo.Address]): Person = copy(addresses = __v)
> def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): scala.Any = {
> __field.getNumber match {
> case 1 => name.getOrElse(null)
> case 2 => age.getOrElse(null)
> case 3 => gender.map(_.valueDescriptor).getOrElse(null)
> case 4 => tags
> case 5 => addresses
> }
> }
> override def toString: String = com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
> def companion = com.example.protos.demo.Person
> }
>
> object Person extends com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] {
> implicit def messageCompanion: com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] = this
> def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): com.example.protos.demo.Person = {
> require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), "FieldDescriptor does not match message type.")
> val __fields = descriptor.getFields
> com.example.protos.demo.Person(
> __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
> __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e => com.example.protos.demo.Gender.fromValue(__e.getNumber)),
> __fieldsMap.getOrElse(__fields.get(3), Nil).asInstanceOf[scala.collection.Seq[String]],
> __fieldsMap.getOrElse(__fields.get(4), Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
> )
> }
> def descriptor: com.google.protobuf.Descriptors.Descriptor = DemoProto.descriptor.getMessageTypes.get(1)
> def messageCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
> var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
> __field.getNumber match {
> case 5 => __out = com.example.protos.demo.Address
> }
> __out
> }
> def enumCompanionForField(__field: com.google.protobuf.Descriptors.FieldDescriptor): com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
> require(__field.getContainingType() == descriptor, "FieldDescriptor does not match message type.")
> __field.getNumber match {
> case 3 => com.example.protos.demo.Gender
> }
> }
> lazy val defaultInstance = com.example.protos.demo.Person(
> )
> implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person]) extends com.trueaccord.lenses.ObjectLens[UpperPB, com.example.protos.demo.Person](_l) {
> def name: com.trueaccord.lenses.Lens[UpperPB, String] = field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
> def optionalName: com.trueaccord.lenses.Lens[UpperPB, scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] = field(_.age)((c_, f_) => c_.copy(age = f_))
> def gender: com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => c_.copy(gender = Some(f_)))
> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => c_.copy(gender = f_))
> def tags: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_))
> def addresses: com.trueaccord.lenses.Lens[UpperPB, scala.collection.Seq[com.example.protos.demo.Address]] = field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
> }
> final val NAME_FIELD_NUMBER = 1
> final val AGE_FIELD_NUMBER = 2
> final val GENDER_FIELD_NUMBER = 3
> final val TAGS_FIELD_NUMBER = 4
> final val ADDRESSES_FIELD_NUMBER = 5
> }
>
>
> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Could you provide the Person class?
>>
>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>> deshpandeshyla@gmail.com> wrote:
>>
>>> I am using 2.11.8. Thanks
>>>
>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>> some known race conditions in reflection and the Scala community doesn't
>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>> is upgrading to Scala 2.11.
>>>>
>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>> deshpandeshyla@gmail.com> wrote:
>>>>
>>>>> I am using protobuf to encode. This may not be related to the new
>>>>> release issue....
>>>>>
>>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>>>> not a term
>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>> ymbols.scala:84)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>> tParams(ScalaReflection.scala:811)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>> ms(ScalaReflection.scala:39)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>> rParameters(ScalaReflection.scala:39)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>> ion.scala:582)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>> ion.scala:460)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>> ly(ScalaReflection.scala:592)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>> ly(ScalaReflection.scala:583)
>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>> aversableLike.scala:252)
>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>> aversableLike.scala:252)
>>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>> ke.scala:252)
>>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>> ion.scala:583)
>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>>> (ScalaReflection.scala:425)
>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>>> ply(ExpressionEncoder.scala:61)
>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>>> cits.scala:47)
>>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>> ssorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>> thodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j
>>>>> ava:147)
>>>>>
>>>>> The following is my code ...
>>>>>
>>>>> object PersonConsumer {
>>>>> import org.apache.spark.rdd.RDD
>>>>> import com.trueaccord.scalapb.spark._
>>>>> import org.apache.spark.sql.{SQLContext, SparkSession}
>>>>> import com.example.protos.demo._
>>>>>
>>>>> def main(args : Array[String]) {
>>>>>
>>>>> def parseLine(s: String): Person =
>>>>> Person.parseFrom(
>>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>>
>>>>> val spark = SparkSession.builder.
>>>>> master("local")
>>>>> .appName("spark session example")
>>>>> .getOrCreate()
>>>>>
>>>>> import spark.implicits._
>>>>>
>>>>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>>
>>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>>
>>>>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>>>>
>>>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>>>
>>>>> val query = ds4.writeStream
>>>>> .outputMode("append")
>>>>> .format("console")
>>>>> .start()
>>>>> query.awaitTermination()
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by shyla deshpande <de...@gmail.com>.
*Thanks for the response. Following is the Person class..*
// Generated by the Scala Plugin for the Protocol Buffer Compiler.
// Do not edit!
//
// Protofile syntax: PROTO2
package com.example.protos.demo
@SerialVersionUID(0L)
final case class Person(
name: scala.Option[String] = None,
age: scala.Option[Int] = None,
gender: scala.Option[com.example.protos.demo.Gender] = None,
tags: scala.collection.Seq[String] = Nil,
addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
) extends com.trueaccord.scalapb.GeneratedMessage with
com.trueaccord.scalapb.Message[Person] with
com.trueaccord.lenses.Updatable[Person] {
@transient
private[this] var __serializedSizeCachedValue: Int = 0
private[this] def __computeSerializedValue(): Int = {
var __size = 0
if (name.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
if (age.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
if (gender.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeEnumSize(3,
gender.get.value) }
tags.foreach(tags => __size +=
com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
addresses.foreach(addresses => __size += 1 +
com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
+ addresses.serializedSize)
__size
}
final override def serializedSize: Int = {
var read = __serializedSizeCachedValue
if (read == 0) {
read = __computeSerializedValue()
__serializedSizeCachedValue = read
}
read
}
def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
name.foreach { __v =>
_output__.writeString(1, __v)
};
age.foreach { __v =>
_output__.writeInt32(2, __v)
};
gender.foreach { __v =>
_output__.writeEnum(3, __v.value)
};
tags.foreach { __v =>
_output__.writeString(4, __v)
};
addresses.foreach { __v =>
_output__.writeTag(5, 2)
_output__.writeUInt32NoTag(__v.serializedSize)
__v.writeTo(_output__)
};
}
def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream):
com.example.protos.demo.Person = {
var __name = this.name
var __age = this.age
var __gender = this.gender
val __tags =
(scala.collection.immutable.Vector.newBuilder[String] ++= this.tags)
val __addresses =
(scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
++= this.addresses)
var _done__ = false
while (!_done__) {
val _tag__ = _input__.readTag()
_tag__ match {
case 0 => _done__ = true
case 10 =>
__name = Some(_input__.readString())
case 16 =>
__age = Some(_input__.readInt32())
case 24 =>
__gender =
Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
case 34 =>
__tags += _input__.readString()
case 42 =>
__addresses +=
com.trueaccord.scalapb.LiteParser.readMessage(_input__,
com.example.protos.demo.Address.defaultInstance)
case tag => _input__.skipField(tag)
}
}
com.example.protos.demo.Person(
name = __name,
age = __age,
gender = __gender,
tags = __tags.result(),
addresses = __addresses.result()
)
}
def getName: String = name.getOrElse("")
def clearName: Person = copy(name = None)
def withName(__v: String): Person = copy(name = Some(__v))
def getAge: Int = age.getOrElse(0)
def clearAge: Person = copy(age = None)
def withAge(__v: Int): Person = copy(age = Some(__v))
def getGender: com.example.protos.demo.Gender =
gender.getOrElse(com.example.protos.demo.Gender.MALE)
def clearGender: Person = copy(gender = None)
def withGender(__v: com.example.protos.demo.Gender): Person =
copy(gender = Some(__v))
def clearTags = copy(tags = scala.collection.Seq.empty)
def addTags(__vs: String*): Person = addAllTags(__vs)
def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags
= tags ++ __vs)
def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v)
def clearAddresses = copy(addresses = scala.collection.Seq.empty)
def addAddresses(__vs: com.example.protos.demo.Address*): Person =
addAllAddresses(__vs)
def addAllAddresses(__vs:
TraversableOnce[com.example.protos.demo.Address]): Person =
copy(addresses = addresses ++ __vs)
def withAddresses(__v:
scala.collection.Seq[com.example.protos.demo.Address]): Person =
copy(addresses = __v)
def getField(__field:
com.google.protobuf.Descriptors.FieldDescriptor): scala.Any = {
__field.getNumber match {
case 1 => name.getOrElse(null)
case 2 => age.getOrElse(null)
case 3 => gender.map(_.valueDescriptor).getOrElse(null)
case 4 => tags
case 5 => addresses
}
}
override def toString: String =
com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
def companion = com.example.protos.demo.Person
}
object Person extends
com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
{
implicit def messageCompanion:
com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
= this
def fromFieldsMap(__fieldsMap:
scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor,
scala.Any]): com.example.protos.demo.Person = {
require(__fieldsMap.keys.forall(_.getContainingType() ==
descriptor), "FieldDescriptor does not match message type.")
val __fields = descriptor.getFields
com.example.protos.demo.Person(
__fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
__fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
__fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e
=> com.example.protos.demo.Gender.fromValue(__e.getNumber)),
__fieldsMap.getOrElse(__fields.get(3),
Nil).asInstanceOf[scala.collection.Seq[String]],
__fieldsMap.getOrElse(__fields.get(4),
Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
)
}
def descriptor: com.google.protobuf.Descriptors.Descriptor =
DemoProto.descriptor.getMessageTypes.get(1)
def messageCompanionForField(__field:
com.google.protobuf.Descriptors.FieldDescriptor):
com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
require(__field.getContainingType() == descriptor,
"FieldDescriptor does not match message type.")
var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
__field.getNumber match {
case 5 => __out = com.example.protos.demo.Address
}
__out
}
def enumCompanionForField(__field:
com.google.protobuf.Descriptors.FieldDescriptor):
com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
require(__field.getContainingType() == descriptor,
"FieldDescriptor does not match message type.")
__field.getNumber match {
case 3 => com.example.protos.demo.Gender
}
}
lazy val defaultInstance = com.example.protos.demo.Person(
)
implicit class PersonLens[UpperPB](_l:
com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person])
extends com.trueaccord.lenses.ObjectLens[UpperPB,
com.example.protos.demo.Person](_l) {
def name: com.trueaccord.lenses.Lens[UpperPB, String] =
field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
def optionalName: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
def age: com.trueaccord.lenses.Lens[UpperPB, Int] =
field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
def optionalAge: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[Int]] = field(_.age)((c_, f_) => c_.copy(age = f_))
def gender: com.trueaccord.lenses.Lens[UpperPB,
com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) =>
c_.copy(gender = Some(f_)))
def optionalGender: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_,
f_) => c_.copy(gender = f_))
def tags: com.trueaccord.lenses.Lens[UpperPB,
scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags
= f_))
def addresses: com.trueaccord.lenses.Lens[UpperPB,
scala.collection.Seq[com.example.protos.demo.Address]] =
field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
}
final val NAME_FIELD_NUMBER = 1
final val AGE_FIELD_NUMBER = 2
final val GENDER_FIELD_NUMBER = 3
final val TAGS_FIELD_NUMBER = 4
final val ADDRESSES_FIELD_NUMBER = 5
}
On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:
> Could you provide the Person class?
>
> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> I am using 2.11.8. Thanks
>>
>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>>> known race conditions in reflection and the Scala community doesn't have
>>> plan to fix it (http://docs.scala-lang.org/ov
>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>>> upgrading to Scala 2.11.
>>>
>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>> deshpandeshyla@gmail.com> wrote:
>>>
>>>> I am using protobuf to encode. This may not be related to the new
>>>> release issue....
>>>>
>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>>> not a term
>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>> ymbols.scala:84)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>> tParams(ScalaReflection.scala:811)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>> ms(ScalaReflection.scala:39)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>> ructorParameters(ScalaReflection.scala:800)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>> rParameters(ScalaReflection.scala:39)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>> ion.scala:582)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>> ion.scala:460)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>> ly(ScalaReflection.scala:592)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>> ly(ScalaReflection.scala:583)
>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>> aversableLike.scala:252)
>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>> aversableLike.scala:252)
>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>> ke.scala:252)
>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>> ion.scala:583)
>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>> (ScalaReflection.scala:425)
>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>> ply(ExpressionEncoder.scala:61)
>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>> cits.scala:47)
>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>>
>>>> The following is my code ...
>>>>
>>>> object PersonConsumer {
>>>> import org.apache.spark.rdd.RDD
>>>> import com.trueaccord.scalapb.spark._
>>>> import org.apache.spark.sql.{SQLContext, SparkSession}
>>>> import com.example.protos.demo._
>>>>
>>>> def main(args : Array[String]) {
>>>>
>>>> def parseLine(s: String): Person =
>>>> Person.parseFrom(
>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>
>>>> val spark = SparkSession.builder.
>>>> master("local")
>>>> .appName("spark session example")
>>>> .getOrCreate()
>>>>
>>>> import spark.implicits._
>>>>
>>>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>
>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>
>>>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>>>
>>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>>
>>>> val query = ds4.writeStream
>>>> .outputMode("append")
>>>> .format("console")
>>>> .start()
>>>> query.awaitTermination()
>>>> }
>>>> }
>>>>
>>>>
>>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you provide the Person class?
On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <de...@gmail.com>
wrote:
> I am using 2.11.8. Thanks
>
> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>> known race conditions in reflection and the Scala community doesn't have
>> plan to fix it (http://docs.scala-lang.org/ov
>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>> upgrading to Scala 2.11.
>>
>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>> deshpandeshyla@gmail.com> wrote:
>>
>>> I am using protobuf to encode. This may not be related to the new
>>> release issue....
>>>
>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>> not a term
>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>> ymbols.scala:84)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>> tParams(ScalaReflection.scala:811)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>> ms(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>> ructorParameters(ScalaReflection.scala:800)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>> rParameters(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:582)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:460)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:592)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:583)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>> ke.scala:252)
>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:583)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>> (ScalaReflection.scala:425)
>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>> ply(ExpressionEncoder.scala:61)
>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>> cits.scala:47)
>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>> at PersonConsumer.main(PersonConsumer.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>
>>> The following is my code ...
>>>
>>> object PersonConsumer {
>>> import org.apache.spark.rdd.RDD
>>> import com.trueaccord.scalapb.spark._
>>> import org.apache.spark.sql.{SQLContext, SparkSession}
>>> import com.example.protos.demo._
>>>
>>> def main(args : Array[String]) {
>>>
>>> def parseLine(s: String): Person =
>>> Person.parseFrom(
>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>
>>> val spark = SparkSession.builder.
>>> master("local")
>>> .appName("spark session example")
>>> .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>
>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>
>>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>>
>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>
>>> val query = ds4.writeStream
>>> .outputMode("append")
>>> .format("console")
>>> .start()
>>> query.awaitTermination()
>>> }
>>> }
>>>
>>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by shyla deshpande <de...@gmail.com>.
I am using 2.11.8. Thanks
On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:
> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
> known race conditions in reflection and the Scala community doesn't have
> plan to fix it (http://docs.scala-lang.org/overviews/reflection/thread-
> safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11.
>
> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
> deshpandeshyla@gmail.com> wrote:
>
>> I am using protobuf to encode. This may not be related to the new release
>> issue....
>>
>> Exception in thread "main" scala.ScalaReflectionException: <none> is not
>> a term
>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(
>> Symbols.scala:84)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>> tParams(ScalaReflection.scala:811)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>> ms(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>> ructorParameters(ScalaReflection.scala:800)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>> rParameters(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:582)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:460)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:592)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:583)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:252)
>> at scala.collection.immutable.List.flatMap(List.scala:344)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:583)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>> (ScalaReflection.scala:425)
>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>> ply(ExpressionEncoder.scala:61)
>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>> cits.scala:47)
>> at PersonConsumer$.main(PersonConsumer.scala:33)
>> at PersonConsumer.main(PersonConsumer.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> The following is my code ...
>>
>> object PersonConsumer {
>> import org.apache.spark.rdd.RDD
>> import com.trueaccord.scalapb.spark._
>> import org.apache.spark.sql.{SQLContext, SparkSession}
>> import com.example.protos.demo._
>>
>> def main(args : Array[String]) {
>>
>> def parseLine(s: String): Person =
>> Person.parseFrom(
>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>
>> val spark = SparkSession.builder.
>> master("local")
>> .appName("spark session example")
>> .getOrCreate()
>>
>> import spark.implicits._
>>
>> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>
>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>
>> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>>
>> val ds4 = spark.sqlContext.sql("select name from persons")
>>
>> val query = ds4.writeStream
>> .outputMode("append")
>> .format("console")
>> .start()
>> query.awaitTermination()
>> }
>> }
>>
>>
>
Re: Spark 2.0.2 with Kafka source, Error please help!
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
known race conditions in reflection and the Scala community doesn't have
plan to fix it (
http://docs.scala-lang.org/overviews/reflection/thread-safety.html) AFAIK,
the only way to fix it is upgrading to Scala 2.11.
On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <de...@gmail.com>
wrote:
> I am using protobuf to encode. This may not be related to the new release
> issue....
>
> Exception in thread "main" scala.ScalaReflectionException: <none> is not
> a term
> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
> at scala.reflect.internal.Symbols$SymbolContextApiImpl.
> asTerm(Symbols.scala:84)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(
> ScalaReflection.scala:811)
> at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(
> ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.
> getConstructorParameters(ScalaReflection.scala:800)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> getConstructorParameters(ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:582)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:460)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:592)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:583)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:583)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> serializerFor(ScalaReflection.scala:425)
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.
> apply(ExpressionEncoder.scala:61)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
> at org.apache.spark.sql.SQLImplicits.newProductEncoder(
> SQLImplicits.scala:47)
> at PersonConsumer$.main(PersonConsumer.scala:33)
> at PersonConsumer.main(PersonConsumer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> The following is my code ...
>
> object PersonConsumer {
> import org.apache.spark.rdd.RDD
> import com.trueaccord.scalapb.spark._
> import org.apache.spark.sql.{SQLContext, SparkSession}
> import com.example.protos.demo._
>
> def main(args : Array[String]) {
>
> def parseLine(s: String): Person =
> Person.parseFrom(
> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>
> val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
> import spark.implicits._
>
> val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>
> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>
> val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
>
> val ds4 = spark.sqlContext.sql("select name from persons")
>
> val query = ds4.writeStream
> .outputMode("append")
> .format("console")
> .start()
> query.awaitTermination()
> }
> }
>
>