You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aries Kong <ar...@gmail.com> on 2014/03/07 13:52:25 UTC

Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?

Hi,
	I'm trying to run a kafka-stream and get a strange exception. The
streaming is created by following code:

    	val lines = KafkaUtils.createStream[String, VtrRecord,
StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap,
topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2)

	'VtrRecord' is generated by protobuf in the same package,
'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord'
as following:

	import com.aries.hawkeyes.VtrRecordProtos.VtrRecord
	class VtrRecordDeserializer(props: VerifiableProperties = null)
extends kafka.serializer.Decoder[VtrRecord] {
		override def fromBytes(bytes : Array[Byte]) : VtrRecord = {
			VtrRecord.parseFrom(bytes)
		}
	}

	When the assembly jar(build by maven-shade-plugin)  is submitted to
the Spark cluster, I get the following ClassNotFoundException
exception:

java.lang.RuntimeException: Unable to find proto buffer class
        at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:616)
        at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
        at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException:
com.aries.hawkeyes.VtrRecordProtos$VtrRecord
        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:186)
        at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
        ... 34 more

	I have checked the assembly jar on the workers with `jar -tf`,
'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there.
Also, to test whether the executor can load this class,  I have tried
'System.out.println(Class.forName("com.aries.hawkeyes.VtrRecordProtos$VtrRecord"))'
in my application and
'Thread.currentThread.getContextClassLoader.loadClass("com.aries.hawkeyes.VtrRecordProtos$VtrRecord")'
in org.apache.spark.executor.Executor.run(), both work fine without
any exception. Is this due to Kafka pasers the record in another
thread whose ClassLoader does not load the task jar? Is there some way
to make it work?

	Btw the spark cluster build in Spark 0.9.0 runs in standalone mode.

	Thanks.

Aries.K

Re: Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?

Posted by n0rb3rt <jn...@gmail.com>.
Any resolution to this?

I'm new to Spark and have had success running an application locally.  But
hitting this same error when submitting it to a standalone cluster.  Not
using kafka streaming in this case, just parsing proto messages wrapped in
an avro object file.  Have read all the threads about protobuf version
incompatibilities and tried about every combination of protoc and
protobuf-java that were mentioned, to no avail. 

Here's some of the relevant code.

class Sparcules(master: String, path: String) {
  val sc = new SparkContext(
    new SparkConf()
      .setMaster(master)
      .setAppName("Sparcules")
  )
  val people = readAvro[Person](sc, path+"people.avro",
Person.getDefaultInstance)
  val systems = readAvro[System](sc, path+"systems.avro",
System.getDefaultInstance)

  def readAvro[T: ClassTag](sparkContext: SparkContext, path: String, proto:
Message): RDD[T] = {
    val rdd = sparkContext.newAPIHadoopFile[
      AvroKey[ByteBuffer],
      NullWritable,
      AvroKeyInputFormat[ByteBuffer]](path)
    rdd.map{ row =>
      proto.newBuilderForType
        .mergeFrom(row._1.datum.array)
        .build
        .asInstanceOf[T]
    }
  }

  def person_system: RDD[(Person, System)] = {
    val person_kv = people.map{p: Person => (p.getId, p)}
    val system_kv = systems.map{s: System => (s.getPersonId, s)}
    person_kv.join(system_kv)
      .map{ case((person_id, (person, system))) =>
      (person, system)
    }
  }

}

object People {

  def main(args: Array[String]) {
    val prc = new Sparcules(args(0), args(1))
    prc.person_system.map{
      case((person, system)) => (
        (person.getName(0),
          system.getName(0))
        ,
        None
      )
    }.sortByKey(true)
     .map(_._1)
     .saveAsTextFile("person_system")
  }

}



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-not-found-in-Kafka-Stream-due-to-multi-thread-without-correct-ClassLoader-tp2398p5713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.