You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pat Ferrel <pa...@occamsmachete.com> on 2014/06/26 21:13:22 UTC

Running new code on a Spark Cluster

I’ve created a CLI driver for a Spark version of a Mahout job called "item similarity" with several tests that all work fine on local[4] Spark standalone. The code even reads and writes to clustered HDFS. But switching to clustered Spark has a problem that seems tied to a broadcast and/or serialization.

The code uses HashBiMap, which is a Guava Java thing. There are two of these created for every Mahout drm (a distributed matrix), for bi-directional row and column ID lookup. They are created once and then broadcast for access everywhere.

When I run this on clustered Spark I get the following error. At one point we were using HashMaps and they seemed to work on the cluster. So I suspect something about the HashBiMap is causing the problem. I’m also suspicious that it may have to do with serialization in the broadcast. Here is a snippet of code and the error. 

Any ideas?

      // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
      // broadcast them for access in distributed processes, so they are not recalculated in every task.
      // rowIDDictionary is a HashBiMap[String, Int]
      val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the HashBiMap in a non-dsitributed manner
      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)

      val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates the HashBiMap in a non-dsitributed manner
      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)

      val indexedInteractions =
        interactions.map { case (rowID, columnID) =>   //<<<<<<<<<<< this is the stage being submitted before the error
          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
          val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

          rowIndex -> columnIndex
        }

The error seems to happen in executing interactions.map when accessing the _bcast vals. Any idea where to start looking for this?

14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
	at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
	at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
	at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:69)
	at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:138)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)


Re: Running new code on a Spark Cluster

Posted by Pat Ferrel <pa...@occamsmachete.com>.
No, what did you have in mind? 

I assumed they’d work from the docs and it does using local[4] but not sure if the broadcast does any actual serializing in that case. 

I certainly could be off base about my suspicions since I’m just learning to interpret Spark error messages.


On Jun 26, 2014, at 1:54 PM, Muttineni, Vinay <vm...@ebay.com> wrote:

Hi Pat,
Did you try accessing the broadcast variable value outside the Map?
https://google-collections.googlecode.com/svn/trunk/javadoc/com/google/common/collect/HashBiMap.html 
As per the document in the link above, it looks like HashBitMap can indeed be serialized.

-----Original Message-----
From: Pat Ferrel [mailto:pat@occamsmachete.com] 
Sent: Thursday, June 26, 2014 12:13 PM
To: user@spark.apache.org
Subject: Running new code on a Spark Cluster

I've created a CLI driver for a Spark version of a Mahout job called "item similarity" with several tests that all work fine on local[4] Spark standalone. The code even reads and writes to clustered HDFS. But switching to clustered Spark has a problem that seems tied to a broadcast and/or serialization.

The code uses HashBiMap, which is a Guava Java thing. There are two of these created for every Mahout drm (a distributed matrix), for bi-directional row and column ID lookup. They are created once and then broadcast for access everywhere.

When I run this on clustered Spark I get the following error. At one point we were using HashMaps and they seemed to work on the cluster. So I suspect something about the HashBiMap is causing the problem. I'm also suspicious that it may have to do with serialization in the broadcast. Here is a snippet of code and the error. 

Any ideas?

     // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
     // broadcast them for access in distributed processes, so they are not recalculated in every task.
     // rowIDDictionary is a HashBiMap[String, Int]
     val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the HashBiMap in a non-dsitributed manner
     val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)

     val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates the HashBiMap in a non-dsitributed manner
     val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)

     val indexedInteractions =
       interactions.map { case (rowID, columnID) =>   //<<<<<<<<<<< this is the stage being submitted before the error
         val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
         val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

         rowIndex -> columnIndex
       }

The error seems to happen in executing interactions.map when accessing the _bcast vals. Any idea where to start looking for this?

14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException java.lang.NullPointerException
	at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
	at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
	at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:69)
	at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:138)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)



RE: Running new code on a Spark Cluster

Posted by "Muttineni, Vinay" <vm...@ebay.com>.
Hi Pat,
Did you try accessing the broadcast variable value outside the Map?
https://google-collections.googlecode.com/svn/trunk/javadoc/com/google/common/collect/HashBiMap.html 
As per the document in the link above, it looks like HashBitMap can indeed be serialized.

-----Original Message-----
From: Pat Ferrel [mailto:pat@occamsmachete.com] 
Sent: Thursday, June 26, 2014 12:13 PM
To: user@spark.apache.org
Subject: Running new code on a Spark Cluster

I've created a CLI driver for a Spark version of a Mahout job called "item similarity" with several tests that all work fine on local[4] Spark standalone. The code even reads and writes to clustered HDFS. But switching to clustered Spark has a problem that seems tied to a broadcast and/or serialization.

The code uses HashBiMap, which is a Guava Java thing. There are two of these created for every Mahout drm (a distributed matrix), for bi-directional row and column ID lookup. They are created once and then broadcast for access everywhere.

When I run this on clustered Spark I get the following error. At one point we were using HashMaps and they seemed to work on the cluster. So I suspect something about the HashBiMap is causing the problem. I'm also suspicious that it may have to do with serialization in the broadcast. Here is a snippet of code and the error. 

Any ideas?

      // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
      // broadcast them for access in distributed processes, so they are not recalculated in every task.
      // rowIDDictionary is a HashBiMap[String, Int]
      val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the HashBiMap in a non-dsitributed manner
      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)

      val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates the HashBiMap in a non-dsitributed manner
      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)

      val indexedInteractions =
        interactions.map { case (rowID, columnID) =>   //<<<<<<<<<<< this is the stage being submitted before the error
          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
          val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

          rowIndex -> columnIndex
        }

The error seems to happen in executing interactions.map when accessing the _bcast vals. Any idea where to start looking for this?

14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException java.lang.NullPointerException
	at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
	at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
	at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
	at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:69)
	at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:138)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)