You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alan Ngai <al...@opsclarity.com> on 2014/07/24 12:09:00 UTC
spark streaming actor receiver doesn't play well with kryoserializer
it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen. I modified the ActorWordCount example program from
val sparkConf = new SparkConf().setAppName("ActorWordCount")
to
val sparkConf = new SparkConf()
.setAppName("ActorWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
and I get the stack trace below. I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry. I also added a default empty constructor to SampleActorReceiver just for kicks
class SerializationRegistry extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[SampleActorReceiver])
}
}
…
case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
def this() = this(“”)
...
}
...
val sparkConf = new SparkConf()
.setAppName("ActorWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")
None of this worked, same stack trace. Any idea what’s going on? Is this a known issue and is there a workaround?
14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Tathagata Das <ta...@gmail.com>.
Another possible reason behind this maybe that there are two versions of
Akka present in the classpath, which are interfering with each other. This
could happen through many scenarios.
1. Launching Spark application with Scala brings in Akka from Scala, which
interferes with Spark's Akka
2. Multiple Akka through some transitive dependencies
TD
On Thu, Aug 7, 2014 at 2:30 AM, Rohit Rai <ro...@tuplejump.com> wrote:
> Alan/TD,
>
> We are facing the problem in a project going to production.
>
> Was there any progress on this? Are we able to confirm that this is a
> bug/limitation in the current streaming code? Or there is anything wrong in
> user scope?
>
> Regards,
> Rohit
>
> *Founder & CEO, **Tuplejump, Inc.*
> ____________________________
> www.tuplejump.com
> *The Data Engineering Platform*
>
>
> On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> The stack trace was from running the Actor count sample directly, without
>> a spark cluster, so I guess the logs would be from both? I enabled more
>> logging and got this stack trace
>>
>> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>> 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(alan)
>> 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
>> is: off
>> 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>> 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>> 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
>> addresses :[akka.tcp://spark@leungshwingchun:52156]
>> 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
>> akka.tcp://spark@leungshwingchun:52156]
>> 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>> 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>> 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories
>> at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>> 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>> 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
>> 297.0 MB.
>> 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
>> with id = ConnectionManagerId(leungshwingchun,52157)
>> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
>> BlockManager
>> 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
>> leungshwingchun:52157 with 297.0 MB RAM
>> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>> 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
>> http://192.168.1.233:52158
>> 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>> 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
>> http://192.168.1.233:52159
>> 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
>> http://leungshwingchun:4040
>> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of successful kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of failed kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>> 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
>> related metrics
>> 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
>> SCDynamicStore
>> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
>> found, setting default realm to empty
>> 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object
>> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
>> custom-built native-hadoop library...
>> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
>> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>> 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
>> back to shell based
>> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
>> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
>> 14/07/25 17:55:27 [DEBUG] Groups: Group mapping
>> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
>> cacheTimeout=300000
>> 14/07/25 17:55:28 [INFO] SparkContext: Added JAR
>> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
>> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>> 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>> 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>> 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval =
>> null
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
>> ms
>> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and
>> validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>> 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>> 14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>> 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>> 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>> 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>> 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>> 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>> 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>> 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>> 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
>> ReceiverTracker.scala:275
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
>> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
>> ReceiverTracker.scala:275)
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>> 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
>> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
>> has no missing parents
>> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
>> at time 1406336130000
>> 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
>> 1406336130000 ms
>> 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
>> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>> 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
>> Set(ResultTask(0, 0))
>> 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
>> tasks
>> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
>> TaskSet 0.0: ANY
>> 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
>> TaskSet_0, runningTasks: 0
>> 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
>> executor localhost: localhost (PROCESS_LOCAL)
>> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>> 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
>> bytes in 8 ms
>> 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>> 14/07/25 17:55:28 [INFO] Executor: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>> 14/07/25 17:55:28 [INFO] Utils: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>> 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>> 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
>> directory
>> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>> at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
>> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
>> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
>> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at org.apache.spark.executor.Executor.org
>> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
>> So not using it.
>> 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>> 14/07/25 17:55:28 [INFO] Executor: Adding
>> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
>> to class loader
>> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>> 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for
>> BlockGenerator at time 1406336129000
>> 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>> 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>> 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
>> initialized at:akka://spark/user/Supervisor0
>> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>> creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
>> class akka.actor.ActorCell
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>> 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> akka.actor.ActorInitializationException: exception during creation
>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>> at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>> at akka.util.Reflect$.instantiate(Reflect.scala:69)
>> at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129000
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129200
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129400
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129600
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129800
>> 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336130000
>>
>> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
>> wrote:
>>
>> Is this error on the executor or on the driver? Can you provide a larger
>> snippet of the logs, driver as well as if possible executor logs.
>>
>> TD
>>
>>
>> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>>> bump. any ideas?
>>>
>>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>>
>>> it looks like when you configure sparkconfig to use the kryoserializer
>>> in combination of using an ActorReceiver, bad things happen. I modified
>>> the ActorWordCount example program from
>>>
>>> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>>
>>> to
>>>
>>> val sparkConf = new SparkConf()
>>> .setAppName("ActorWordCount")
>>> .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer”)
>>>
>>> and I get the stack trace below. I figured it might be that Kryo
>>> doesn’t know how to serialize/deserialize the actor so I added a registry.
>>> I also added a default empty constructor to SampleActorReceiver just for
>>> kicks
>>>
>>> class SerializationRegistry extends KryoRegistrator {
>>> override def registerClasses(kryo: Kryo) {
>>> kryo.register(classOf[SampleActorReceiver])
>>> }
>>> }
>>>
>>> …
>>>
>>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>>> extends Actor with ActorHelper {
>>> def this() = this(“”)
>>> ...
>>> }
>>>
>>> ...
>>> val sparkConf = new SparkConf()
>>> .setAppName("ActorWordCount")
>>> .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> .set("spark.kryo.registrator",
>>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>>
>>>
>>> None of this worked, same stack trace. Any idea what’s going on? Is
>>> this a known issue and is there a workaround?
>>>
>>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>> akka.actor.ActorInitializationException: exception during creation
>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: akka.ConfigurationException: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>> at
>>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>> at
>>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>> at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>> at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>>> at akka.actor.Props.newActor(Props.scala:339)
>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>>> ... 9 more
>>> Caused by: java.lang.IllegalArgumentException: constructor public
>>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>>> arguments [class java.lang.Class, class
>>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>> at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>> at akka.actor.Props.cachedActorClass(Props.scala:203)
>>> at akka.actor.Props.actorClass(Props.scala:327)
>>> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>>> ... 20 more
>>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>> ... 24 more
>>>
>>>
>>>
>>
>>
>
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Rohit Rai <ro...@tuplejump.com>.
Alan/TD,
We are facing the problem in a project going to production.
Was there any progress on this? Are we able to confirm that this is a
bug/limitation in the current streaming code? Or there is anything wrong in
user scope?
Regards,
Rohit
*Founder & CEO, **Tuplejump, Inc.*
____________________________
www.tuplejump.com
*The Data Engineering Platform*
On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:
> The stack trace was from running the Actor count sample directly, without
> a spark cluster, so I guess the logs would be from both? I enabled more
> logging and got this stack trace
>
> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
> 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(alan)
> 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
> is: off
> 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
> 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
> 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
> addresses :[akka.tcp://spark@leungshwingchun:52156]
> 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
> akka.tcp://spark@leungshwingchun:52156]
> 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
> 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
> 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
> root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
> 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
> 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
> 297.0 MB.
> 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
> with id = ConnectionManagerId(leungshwingchun,52157)
> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
> BlockManager
> 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
> leungshwingchun:52157 with 297.0 MB RAM
> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
> 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
> http://192.168.1.233:52158
> 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
> 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
> http://192.168.1.233:52159
> 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
> http://leungshwingchun:4040
> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of successful kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of failed kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
> 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
> related metrics
> 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
> SCDynamicStore
> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
> found, setting default realm to empty
> 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
> custom-built native-hadoop library...
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
> 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
> back to shell based
> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
> 14/07/25 17:55:27 [DEBUG] Groups: Group mapping
> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
> cacheTimeout=300000
> 14/07/25 17:55:28 [INFO] SparkContext: Added JAR
> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
> with timestamp 1406336128212
> 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
> 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
> 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
> ms
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
> 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
> 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
> 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
> ReceiverTracker.scala:275
> 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
> 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
> ReceiverTracker.scala:275)
> 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
> 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
> has no missing parents
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
> at time 1406336130000
> 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
> 1406336130000 ms
> 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
> Set(ResultTask(0, 0))
> 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
> tasks
> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
> TaskSet 0.0: ANY
> 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
> TaskSet_0, runningTasks: 0
> 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
> bytes in 8 ms
> 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
> 14/07/25 17:55:28 [INFO] Executor: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with
> timestamp 1406336128212
> 14/07/25 17:55:28 [INFO] Utils: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
> 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
> 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
> directory
> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
> at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
> So not using it.
> 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
> 14/07/25 17:55:28 [INFO] Executor: Adding
> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
> to class loader
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator
> at time 1406336129000
> 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
> 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
> 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
> initialized at:akka://spark/user/Supervisor0
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
> creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
> class akka.actor.ActorCell
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0
> from akka://spark
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
> 0 from akka://spark
> 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
> at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
> at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
> at akka.util.Reflect$.instantiate(Reflect.scala:69)
> at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129000
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129200
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129400
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129600
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129800
> 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336130000
>
> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
> wrote:
>
> Is this error on the executor or on the driver? Can you provide a larger
> snippet of the logs, driver as well as if possible executor logs.
>
> TD
>
>
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> bump. any ideas?
>>
>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>> it looks like when you configure sparkconfig to use the kryoserializer in
>> combination of using an ActorReceiver, bad things happen. I modified the
>> ActorWordCount example program from
>>
>> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>
>> to
>>
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer”)
>>
>> and I get the stack trace below. I figured it might be that Kryo doesn’t
>> know how to serialize/deserialize the actor so I added a registry. I also
>> added a default empty constructor to SampleActorReceiver just for kicks
>>
>> class SerializationRegistry extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo) {
>> kryo.register(classOf[SampleActorReceiver])
>> }
>> }
>>
>> …
>>
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>> def this() = this(“”)
>> ...
>> }
>>
>> ...
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator",
>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>
>>
>> None of this worked, same stack trace. Any idea what’s going on? Is
>> this a known issue and is there a workaround?
>>
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> akka.actor.ActorInitializationException: exception during creation
>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>> at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>> at akka.util.Reflect$.instantiate(Reflect.scala:69)
>> at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>>
>>
>>
>
>
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Prashant Sharma <sc...@gmail.com>.
This looks like a bug to me. This happens because we serialize the code
that starts the receiver and send it across. And since we have not
registered the classes of akka library it does not work. I have not tried
myself, but may be by including something like chill-akka (
https://github.com/xitrum-framework/chill-akka) might help. I am not well
aware about how kryo works internally, may be someone else can throw some
light on this.
Prashant Sharma
On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <al...@opsclarity.com> wrote:
> The stack trace was from running the Actor count sample directly, without
> a spark cluster, so I guess the logs would be from both? I enabled more
> logging and got this stack trace
>
> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
> 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(alan)
> 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
> is: off
> 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
> 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
> 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
> addresses :[akka.tcp://spark@leungshwingchun:52156]
> 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
> akka.tcp://spark@leungshwingchun:52156]
> 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
> 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
> 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
> root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
> 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
> 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
> 297.0 MB.
> 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
> with id = ConnectionManagerId(leungshwingchun,52157)
> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
> BlockManager
> 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
> leungshwingchun:52157 with 297.0 MB RAM
> 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
> 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
> http://192.168.1.233:52158
> 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
> 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
> 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
> 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
> http://192.168.1.233:52159
> 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
> http://leungshwingchun:4040
> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of successful kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
> 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
> value=[Rate of failed kerberos logins and latency (milliseconds)],
> always=false, type=DEFAULT, sampleName=Ops)
> 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
> related metrics
> 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
> SCDynamicStore
> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
> found, setting default realm to empty
> 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
> custom-built native-hadoop library...
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
> 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
> 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
> back to shell based
> 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
> 14/07/25 17:55:27 [DEBUG] Groups: Group mapping
> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
> cacheTimeout=300000
> 14/07/25 17:55:28 [INFO] SparkContext: Added JAR
> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
> with timestamp 1406336128212
> 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
> 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
> 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
> ms
> 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
> 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
> 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
> 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
> 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
> 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
> 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
> ReceiverTracker.scala:275
> 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
> 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
> ReceiverTracker.scala:275)
> 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
> 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
> has no missing parents
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
> at time 1406336130000
> 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
> 1406336130000 ms
> 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
> 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
> Set(ResultTask(0, 0))
> 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
> tasks
> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
> 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
> TaskSet 0.0: ANY
> 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
> TaskSet_0, runningTasks: 0
> 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
> bytes in 8 ms
> 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
> 14/07/25 17:55:28 [INFO] Executor: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with
> timestamp 1406336128212
> 14/07/25 17:55:28 [INFO] Utils: Fetching
> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
> 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
> 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
> directory
> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
> at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
> So not using it.
> 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
> 14/07/25 17:55:28 [INFO] Executor: Adding
> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
> to class loader
> 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
> org.apache.spark.examples.streaming.SerializationRegistry
> 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
> 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator
> at time 1406336129000
> 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
> 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
> 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
> initialized at:akka://spark/user/Supervisor0
> 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
> creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
> class akka.actor.ActorCell
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0
> from akka://spark
> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
> 0 from akka://spark
> 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
> at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
> at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
> at akka.util.Reflect$.instantiate(Reflect.scala:69)
> at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129000
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129200
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129400
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129600
> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336129800
> 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
> called at time 1406336130000
>
> On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com>
> wrote:
>
> Is this error on the executor or on the driver? Can you provide a larger
> snippet of the logs, driver as well as if possible executor logs.
>
> TD
>
>
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> bump. any ideas?
>>
>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>>
>> it looks like when you configure sparkconfig to use the kryoserializer in
>> combination of using an ActorReceiver, bad things happen. I modified the
>> ActorWordCount example program from
>>
>> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>
>> to
>>
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer”)
>>
>> and I get the stack trace below. I figured it might be that Kryo doesn’t
>> know how to serialize/deserialize the actor so I added a registry. I also
>> added a default empty constructor to SampleActorReceiver just for kicks
>>
>> class SerializationRegistry extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo) {
>> kryo.register(classOf[SampleActorReceiver])
>> }
>> }
>>
>> …
>>
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>> def this() = this(“”)
>> ...
>> }
>>
>> ...
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator",
>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>
>>
>> None of this worked, same stack trace. Any idea what’s going on? Is
>> this a known issue and is there a workaround?
>>
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> akka.actor.ActorInitializationException: exception during creation
>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>> at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>> at akka.util.Reflect$.instantiate(Reflect.scala:69)
>> at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>>
>>
>>
>
>
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Alan Ngai <al...@opsclarity.com>.
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both? I enabled more logging and got this stack trace
4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
14/07/25 17:55:27 [INFO] Remoting: Starting remoting
14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156]
14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [akka.tcp://spark@leungshwingchun:52156]
14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object
14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
14/07/25 17:55:28 [INFO] Executor: Running task ID 0
14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000
On Jul 25, 2014, at 3:20 PM, Tathagata Das <ta...@gmail.com> wrote:
> Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs.
>
> TD
>
>
> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
> bump. any ideas?
>
> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>
>> it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen. I modified the ActorWordCount example program from
>>
>> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>
>> to
>>
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
>>
>> and I get the stack trace below. I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry. I also added a default empty constructor to SampleActorReceiver just for kicks
>>
>> class SerializationRegistry extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo) {
>> kryo.register(classOf[SampleActorReceiver])
>> }
>> }
>>
>> …
>>
>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>> extends Actor with ActorHelper {
>> def this() = this(“”)
>> ...
>> }
>>
>> ...
>> val sparkConf = new SparkConf()
>> .setAppName("ActorWordCount")
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")
>>
>>
>> None of this worked, same stack trace. Any idea what’s going on? Is this a known issue and is there a workaround?
>>
>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> akka.actor.ActorInitializationException: exception during creation
>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>> at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>> at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>> at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>> at akka.util.Reflect$.instantiate(Reflect.scala:69)
>> at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>>
>
>
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Tathagata Das <ta...@gmail.com>.
Is this error on the executor or on the driver? Can you provide a larger
snippet of the logs, driver as well as if possible executor logs.
TD
On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <al...@opsclarity.com> wrote:
> bump. any ideas?
>
> On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
>
> it looks like when you configure sparkconfig to use the kryoserializer in
> combination of using an ActorReceiver, bad things happen. I modified the
> ActorWordCount example program from
>
> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>
> to
>
> val sparkConf = new SparkConf()
> .setAppName("ActorWordCount")
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer”)
>
> and I get the stack trace below. I figured it might be that Kryo doesn’t
> know how to serialize/deserialize the actor so I added a registry. I also
> added a default empty constructor to SampleActorReceiver just for kicks
>
> class SerializationRegistry extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[SampleActorReceiver])
> }
> }
>
> …
>
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
> def this() = this(“”)
> ...
> }
>
> ...
> val sparkConf = new SparkConf()
> .setAppName("ActorWordCount")
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator",
> "org.apache.spark.examples.streaming.SerializationRegistry")
>
>
> None of this worked, same stack trace. Any idea what’s going on? Is this
> a known issue and is there a workaround?
>
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
> at
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
> at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
> arguments [class java.lang.Class, class
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
> at akka.util.Reflect$.instantiate(Reflect.scala:69)
> at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
>
>
>
Re: spark streaming actor receiver doesn't play well with kryoserializer
Posted by Alan Ngai <al...@opsclarity.com>.
bump. any ideas?
On Jul 24, 2014, at 3:09 AM, Alan Ngai <al...@opsclarity.com> wrote:
> it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen. I modified the ActorWordCount example program from
>
> val sparkConf = new SparkConf().setAppName("ActorWordCount")
>
> to
>
> val sparkConf = new SparkConf()
> .setAppName("ActorWordCount")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
>
> and I get the stack trace below. I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry. I also added a default empty constructor to SampleActorReceiver just for kicks
>
> class SerializationRegistry extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[SampleActorReceiver])
> }
> }
>
> …
>
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
> def this() = this(“”)
> ...
> }
>
> ...
> val sparkConf = new SparkConf()
> .setAppName("ActorWordCount")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")
>
>
> None of this worked, same stack trace. Any idea what’s going on? Is this a known issue and is there a workaround?
>
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
> at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
> at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
> at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
> at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
> at akka.actor.Props.newActor(Props.scala:339)
> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
> at akka.actor.ActorCell.create(ActorCell.scala:560)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
> at akka.util.Reflect$.instantiate(Reflect.scala:69)
> at akka.actor.Props.cachedActorClass(Props.scala:203)
> at akka.actor.Props.actorClass(Props.scala:327)
> at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
> at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
> at akka.util.Reflect$.instantiate(Reflect.scala:65)
> ... 24 more
>