You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alan Ngai <al...@opsclarity.com> on 2014/07/24 12:09:00 UTC

spark streaming actor receiver doesn't play well with kryoserializer

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}

…

case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
	at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
	at akka.actor.ActorCell.create(ActorCell.scala:578)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
	at akka.dispatch.Mailbox.run(Mailbox.scala:218)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
	at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
	at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
	at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
	at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
	at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
	at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
	at akka.actor.Props.newActor(Props.scala:339)
	at akka.actor.ActorCell.newActor(ActorCell.scala:534)
	at akka.actor.ActorCell.create(ActorCell.scala:560)
	... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
	at akka.util.Reflect$.instantiate(Reflect.scala:69)
	at akka.actor.Props.cachedActorClass(Props.scala:203)
	at akka.actor.Props.actorClass(Props.scala:327)
	at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
	... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
	at akka.util.Reflect$.instantiate(Reflect.scala:65)
	... 24 more


Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Tathagata Das <ta...@gmail.com>.
Another possible reason behind this maybe that there are two versions of
Akka present in the classpath, which are interfering with each other. This
could happen through many scenarios.

1. Launching Spark application with Scala brings in Akka from Scala, which
interferes with Spark's Akka
2. Multiple Akka through some transitive dependencies

TD


On Thu, Aug 7, 2014 at 2:30 AM, Rohit Rai <ro...@tuplejump.com> wrote:

> Alan/TD,
>
> We are facing the problem in a project going to production.
>
> Was there any progress on this? Are we able to confirm that this is a
> bug/limitation in the current streaming code? Or there is anything wrong in
> user scope?
>
> Regards,
> Rohit
>
> *Founder & CEO, **Tuplejump, Inc.*
> ____________________________
> www.tuplejump.com
> *The Data Engineering Platform*
>
>
> On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> The stack trace was from running the Actor count sample directly, without
>> a spark cluster, so I guess the logs would be from both?  I enabled more
>> logging and got this stack trace
>>
>> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>>  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(alan)
>>  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
>> is: off
>>  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>>  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>>  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
>> addresses :[akka.tcp://spark@leungshwingchun:52156]
>>  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
>> akka.tcp://spark@leungshwingchun:52156]
>>  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>>  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>>  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories
>> at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>>  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>>  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
>> 297.0 MB.
>>  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
>> with id = ConnectionManagerId(leungshwingchun,52157)
>>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
>> BlockManager
>>  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
>> leungshwingchun:52157 with 297.0 MB RAM
>>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>>  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
>> http://192.168.1.233:52158
>>  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>>  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
>> http://192.168.1.233:52159
>>  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
>> http://leungshwingchun:4040
>>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of successful kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of failed kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>>  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
>> related metrics
>>  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
>> SCDynamicStore
>> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
>> found, setting default realm to empty
>>  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
>> custom-built native-hadoop library...
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
>> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>>  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
>> back to shell based
>>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
>> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
>>  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
>> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
>> cacheTimeout=300000
>>  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
>> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
>> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>>  14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>>  14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval =
>> null
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
>> ms
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and
>> validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>>  14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>>  14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
>> ReceiverTracker.scala:275
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
>> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
>> ReceiverTracker.scala:275)
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
>> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
>> has no missing parents
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
>> at time 1406336130000
>>  14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
>> 1406336130000 ms
>>  14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
>> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
>> Set(ResultTask(0, 0))
>>  14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
>> tasks
>>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
>> TaskSet 0.0: ANY
>>  14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
>> TaskSet_0, runningTasks: 0
>>  14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
>> executor localhost: localhost (PROCESS_LOCAL)
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
>> bytes in 8 ms
>>  14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>>  14/07/25 17:55:28 [INFO] Executor: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>>  14/07/25 17:55:28 [INFO] Utils: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>>  14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>>  14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
>> directory
>>  java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>>  at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>>  at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
>> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
>>  at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
>>  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>>  at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>>  at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>  at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>  at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>  at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>  at org.apache.spark.executor.Executor.org
>> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
>>  at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:722)
>> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
>> So not using it.
>>  14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>>  14/07/25 17:55:28 [INFO] Executor: Adding
>> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
>> to class loader
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for
>> BlockGenerator at time 1406336129000
>>  14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>>  14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>>  14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
>> initialized at:akka://spark/user/Supervisor0
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>>  creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
>> class akka.actor.ActorCell
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>>  14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  akka.actor.ActorInitializationException: exception during creation
>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>  at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129000
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129200
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129400
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129600
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129800
>>  14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336130000
>>
>> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
>> wrote:
>>
>> Is this error on the executor or on the driver? Can you provide a larger
>> snippet of the logs, driver as well as if possible executor logs.
>>
>> TD
>>
>>
>> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>>> bump.  any ideas?
>>>
>>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>>
>>> it looks like when you configure sparkconfig to use the kryoserializer
>>> in combination of using an ActorReceiver, bad things happen.  I modified
>>> the ActorWordCount example program from
>>>
>>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>>
>>> to
>>>
>>>     val sparkConf = new SparkConf()
>>>       .setAppName("ActorWordCount")
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer”)
>>>
>>> and I get the stack trace below.  I figured it might be that Kryo
>>> doesn’t know how to serialize/deserialize the actor so I added a registry.
>>>  I also added a default empty constructor to SampleActorReceiver just for
>>> kicks
>>>
>>> class SerializationRegistry extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.register(classOf[SampleActorReceiver])
>>>   }
>>> }
>>>
>>> …
>>>
>>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>>> extends Actor with ActorHelper {
>>>   def this() = this(“”)
>>>   ...
>>> }
>>>
>>> ...
>>>     val sparkConf = new SparkConf()
>>>       .setAppName("ActorWordCount")
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>       .set("spark.kryo.registrator",
>>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>>
>>>
>>> None of this worked, same stack trace.  Any idea what’s going on?  Is
>>> this a known issue and is there a workaround?
>>>
>>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>>  akka.actor.ActorInitializationException: exception during creation
>>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>  at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: akka.ConfigurationException: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>>  at
>>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>>> at akka.actor.Props.newActor(Props.scala:339)
>>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>>> ... 9 more
>>> Caused by: java.lang.IllegalArgumentException: constructor public
>>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>>> arguments [class java.lang.Class, class
>>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>>> at akka.actor.Props.actorClass(Props.scala:327)
>>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>>> ... 20 more
>>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>  at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>> ... 24 more
>>>
>>>
>>>
>>
>>
>

Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Rohit Rai <ro...@tuplejump.com>.
Alan/TD,

We are facing the problem in a project going to production.

Was there any progress on this? Are we able to confirm that this is a
bug/limitation in the current streaming code? Or there is anything wrong in
user scope?

Regards,
Rohit

*Founder & CEO, **Tuplejump, Inc.*
____________________________
www.tuplejump.com
*The Data Engineering Platform*


On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:

> The stack trace was from running the Actor count sample directly, without
> a spark cluster, so I guess the logs would be from both?  I enabled more
> logging and got this stack trace
>
> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(alan)
>  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
> is: off
>  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
> addresses :[akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
> akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
> root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
> 297.0 MB.
>  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
> with id = ConnectionManagerId(leungshwingchun,52157)
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
> BlockManager
>  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
> leungshwingchun:52157 with 297.0 MB RAM
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
> http://192.168.1.233:52158
>  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
> http://192.168.1.233:52159
>  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
> http://leungshwingchun:4040
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of successful kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of failed kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
> related metrics
>  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
> SCDynamicStore
> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
> found, setting default realm to empty
>  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
> custom-built native-hadoop library...
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
> back to shell based
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
>  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
> cacheTimeout=300000
>  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
> with timestamp 1406336128212
>  14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>  14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>  14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
> ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>  14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>  14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>  14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
> ReceiverTracker.scala:275
>  14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
> ReceiverTracker.scala:275)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
> has no missing parents
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
> at time 1406336130000
>  14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
> 1406336130000 ms
>  14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
> Set(ResultTask(0, 0))
>  14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
> tasks
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
> TaskSet 0.0: ANY
>  14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
> TaskSet_0, runningTasks: 0
>  14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
> bytes in 8 ms
>  14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>  14/07/25 17:55:28 [INFO] Executor: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with
> timestamp 1406336128212
>  14/07/25 17:55:28 [INFO] Utils: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>  14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>  14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
> directory
>  java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>  at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>  at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
>  at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
>  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>  at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:722)
> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
> So not using it.
>  14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>  14/07/25 17:55:28 [INFO] Executor: Adding
> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
> to class loader
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator
> at time 1406336129000
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>  14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
> initialized at:akka://spark/user/Supervisor0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>  creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
> class akka.actor.ActorCell
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0
> from akka://spark
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
> 0 from akka://spark
>  14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>  at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>  at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>  at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129000
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129200
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129400
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129600
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129800
>  14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336130000
>
> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
> wrote:
>
> Is this error on the executor or on the driver? Can you provide a larger
> snippet of the logs, driver as well as if possible executor logs.
>
> TD
>
>
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> bump.  any ideas?
>>
>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>> it looks like when you configure sparkconfig to use the kryoserializer in
>> combination of using an ActorReceiver, bad things happen.  I modified the
>> ActorWordCount example program from
>>
>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>
>> to
>>
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer”)
>>
>> and I get the stack trace below.  I figured it might be that Kryo doesn’t
>> know how to serialize/deserialize the actor so I added a registry.  I also
>> added a default empty constructor to SampleActorReceiver just for kicks
>>
>> class SerializationRegistry extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[SampleActorReceiver])
>>   }
>> }
>>
>> …
>>
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>>   def this() = this(“”)
>>   ...
>> }
>>
>> ...
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>       .set("spark.kryo.registrator",
>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>
>>
>> None of this worked, same stack trace.  Any idea what’s going on?  Is
>> this a known issue and is there a workaround?
>>
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  akka.actor.ActorInitializationException: exception during creation
>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>  at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>>
>>
>>
>
>

Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Prashant Sharma <sc...@gmail.com>.
This looks like a bug to me. This happens because we serialize the code
that starts the receiver and send it across. And since we have not
registered the classes of akka library it does not work. I have not tried
myself, but may be by including something like chill-akka (
https://github.com/xitrum-framework/chill-akka) might help. I am not well
aware about how kryo works internally, may be someone else can throw some
light on this.

Prashant Sharma




On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:

> The stack trace was from running the Actor count sample directly, without
> a spark cluster, so I guess the logs would be from both?  I enabled more
> logging and got this stack trace
>
> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(alan)
>  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
> is: off
>  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
> addresses :[akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
> akka.tcp://spark@leungshwingchun:52156]
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
> root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
> 297.0 MB.
>  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
> with id = ConnectionManagerId(leungshwingchun,52157)
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
> BlockManager
>  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
> leungshwingchun:52157 with 297.0 MB RAM
>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
> http://192.168.1.233:52158
>  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
> http://192.168.1.233:52159
>  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
> http://leungshwingchun:4040
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of successful kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of failed kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
>  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
> related metrics
>  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
> SCDynamicStore
> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
> found, setting default realm to empty
>  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
> custom-built native-hadoop library...
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
> back to shell based
>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
>  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
> cacheTimeout=300000
>  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
> with timestamp 1406336128212
>  14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>  14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>  14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
> ms
>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>  14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>  14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
>  14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>  14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>  14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>  14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
> ReceiverTracker.scala:275
>  14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
> ReceiverTracker.scala:275)
>  14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
> has no missing parents
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
> at time 1406336130000
>  14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
> 1406336130000 ms
>  14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>  14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
> Set(ResultTask(0, 0))
>  14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
> tasks
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
> TaskSet 0.0: ANY
>  14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
> TaskSet_0, runningTasks: 0
>  14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
> bytes in 8 ms
>  14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>  14/07/25 17:55:28 [INFO] Executor: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with
> timestamp 1406336128212
>  14/07/25 17:55:28 [INFO] Utils: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>  14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>  14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
> directory
>  java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>  at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>  at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
>  at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
>  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>  at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>  at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:722)
> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
> So not using it.
>  14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>  14/07/25 17:55:28 [INFO] Executor: Adding
> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
> to class loader
>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
>  14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator
> at time 1406336129000
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>  14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>  14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
> initialized at:akka://spark/user/Supervisor0
>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>  creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
> class akka.actor.ActorCell
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0
> from akka://spark
>  14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
> 0 from akka://spark
>  14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>  at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>  at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>  at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129000
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129200
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129400
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129600
>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129800
>  14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336130000
>
> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
> wrote:
>
> Is this error on the executor or on the driver? Can you provide a larger
> snippet of the logs, driver as well as if possible executor logs.
>
> TD
>
>
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> bump.  any ideas?
>>
>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>> it looks like when you configure sparkconfig to use the kryoserializer in
>> combination of using an ActorReceiver, bad things happen.  I modified the
>> ActorWordCount example program from
>>
>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>
>> to
>>
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer”)
>>
>> and I get the stack trace below.  I figured it might be that Kryo doesn’t
>> know how to serialize/deserialize the actor so I added a registry.  I also
>> added a default empty constructor to SampleActorReceiver just for kicks
>>
>> class SerializationRegistry extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[SampleActorReceiver])
>>   }
>> }
>>
>> …
>>
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>>   def this() = this(“”)
>>   ...
>> }
>>
>> ...
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>       .set("spark.kryo.registrator",
>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>
>>
>> None of this worked, same stack trace.  Any idea what’s going on?  Is
>> this a known issue and is there a workaround?
>>
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  akka.actor.ActorInitializationException: exception during creation
>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>  at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>>
>>
>>
>
>

Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Alan Ngai <al...@opsclarity.com>.
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both?  I enabled more logging and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
 14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
 java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
 14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
 creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
	at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
	at akka.actor.ActorCell.create(ActorCell.scala:578)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
	at akka.dispatch.Mailbox.run(Mailbox.scala:218)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
	at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
	at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
	at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
	at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
	at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
	at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
	at akka.actor.Props.newActor(Props.scala:339)
	at akka.actor.ActorCell.newActor(ActorCell.scala:534)
	at akka.actor.ActorCell.create(ActorCell.scala:560)
	... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
	at akka.util.Reflect$.instantiate(Reflect.scala:69)
	at akka.actor.Props.cachedActorClass(Props.scala:203)
	at akka.actor.Props.actorClass(Props.scala:327)
	at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
	... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
	at akka.util.Reflect$.instantiate(Reflect.scala:65)
	... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000

On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com> wrote:

> Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 
> 
> TD
> 
> 
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
> bump.  any ideas?
> 
> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
> 
>> it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 
>> 
>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>> 
>> to
>> 
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
>> 
>> and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks
>> 
>> class SerializationRegistry extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[SampleActorReceiver])
>>   }
>> }
>> 
>> …
>> 
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>>   def this() = this(“”)
>>   ...
>> }
>> 
>> ...
>>     val sparkConf = new SparkConf()
>>       .setAppName("ActorWordCount")
>>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>       .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")
>> 
>> 
>> None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  
>> 
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  akka.actor.ActorInitializationException: exception during creation
>> 	at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> 	at akka.actor.ActorCell.create(ActorCell.scala:578)
>> 	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>> 	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> 	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> 	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>> 	at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>> 	at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> 	at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>> 	at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>> 	at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>> 	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> 	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> 	at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> 	at akka.actor.Props.newActor(Props.scala:339)
>> 	at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>> 	at akka.actor.ActorCell.create(ActorCell.scala:560)
>> 	... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>> 	at akka.util.Reflect$.instantiate(Reflect.scala:69)
>> 	at akka.actor.Props.cachedActorClass(Props.scala:203)
>> 	at akka.actor.Props.actorClass(Props.scala:327)
>> 	at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>> 	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> 	... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>> 	at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> 	... 24 more
>> 
> 
> 


Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Tathagata Das <ta...@gmail.com>.
Is this error on the executor or on the driver? Can you provide a larger
snippet of the logs, driver as well as if possible executor logs.

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:

> bump.  any ideas?
>
> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>
> it looks like when you configure sparkconfig to use the kryoserializer in
> combination of using an ActorReceiver, bad things happen.  I modified the
> ActorWordCount example program from
>
>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>
> to
>
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer”)
>
> and I get the stack trace below.  I figured it might be that Kryo doesn’t
> know how to serialize/deserialize the actor so I added a registry.  I also
> added a default empty constructor to SampleActorReceiver just for kicks
>
> class SerializationRegistry extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[SampleActorReceiver])
>   }
> }
>
> …
>
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
>   def this() = this(“”)
>   ...
> }
>
> ...
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator",
> "org.apache.spark.examples.streaming.SerializationRegistry")
>
>
> None of this worked, same stack trace.  Any idea what’s going on?  Is this
> a known issue and is there a workaround?
>
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>  at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>  at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>  at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
>
>
>

Re: spark streaming actor receiver doesn't play well with kryoserializer

Posted by Alan Ngai <al...@opsclarity.com>.
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:

> it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 
> 
>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
> 
> to
> 
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
> 
> and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks
> 
> class SerializationRegistry extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[SampleActorReceiver])
>   }
> }
> 
> …
> 
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
>   def this() = this(“”)
>   ...
> }
> 
> ...
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")
> 
> 
> None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  
> 
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
> 	at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> 	at akka.actor.ActorCell.create(ActorCell.scala:578)
> 	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
> 	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> 	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> 	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
> 	at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
> 	at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> 	at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
> 	at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
> 	at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
> 	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> 	at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> 	at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> 	at akka.actor.Props.newActor(Props.scala:339)
> 	at akka.actor.ActorCell.newActor(ActorCell.scala:534)
> 	at akka.actor.ActorCell.create(ActorCell.scala:560)
> 	... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
> 	at akka.util.Reflect$.instantiate(Reflect.scala:69)
> 	at akka.actor.Props.cachedActorClass(Props.scala:203)
> 	at akka.actor.Props.actorClass(Props.scala:327)
> 	at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
> 	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> 	... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> 	at akka.util.Reflect$.instantiate(Reflect.scala:65)
> 	... 24 more
>