You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/20 01:22:37 UTC

spark git commit: [SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE

Repository: spark
Updated Branches:
  refs/heads/master 9eacc5e43 -> 9b57cd8d5


[SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE

## What changes were proposed in this pull request?

Fix HighlyCompressedMapStatus#writeExternal NPE:
```
17/06/18 15:00:27 ERROR Utils: Exception encountered
java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException
java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        ... 17 more
17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188
17/06/18 15:00:27 ERROR Utils: Exception encountered
java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

## How was this patch tested?

manual tests

Author: Yuming Wang <wg...@gmail.com>

Closes #18343 from wangyum/SPARK-21133.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b57cd8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b57cd8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b57cd8d

Branch: refs/heads/master
Commit: 9b57cd8d5c594731a7b3c90ce59bcddb05193d79
Parents: 9eacc5e
Author: Yuming Wang <wg...@gmail.com>
Authored: Tue Jun 20 09:22:30 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jun 20 09:22:30 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala    |  2 +-
 .../apache/spark/serializer/KryoSerializer.scala  |  1 +
 .../apache/spark/scheduler/MapStatusSuite.scala   | 18 ++++++++++++++++--
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 048e0d0..5e45b37 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private (
     private[this] var numNonEmptyBlocks: Int,
     private[this] var emptyBlocks: RoaringBitmap,
     private[this] var avgSize: Long,
-    @transient private var hugeBlockSizes: Map[Int, Byte])
+    private var hugeBlockSizes: Map[Int, Byte])
   extends MapStatus with Externalizable {
 
   // loc could be null when the default constructor is called during deserialization

http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index e15166d..4f03e54 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(None.getClass)
     kryo.register(Nil.getClass)
     kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
+    kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
     kryo.register(classOf[ArrayBuffer[Any]])
 
     kryo.setClassLoader(classLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 3ec37f6..e612013 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -24,9 +24,9 @@ import scala.util.Random
 import org.mockito.Mockito._
 import org.roaringbitmap.RoaringBitmap
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
 import org.apache.spark.internal.config
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.storage.BlockManagerId
 
 class MapStatusSuite extends SparkFunSuite {
@@ -154,4 +154,18 @@ class MapStatusSuite extends SparkFunSuite {
       case part => assert(status2.getSizeForBlock(part) >= sizes(part))
     }
   }
+
+  test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
+    val conf = new SparkConf()
+      .set("spark.serializer", classOf[KryoSerializer].getName)
+      .setMaster("local")
+      .setAppName("SPARK-21133")
+    val sc = new SparkContext(conf)
+    try {
+      val count = sc.parallelize(0 until 3000, 10).repartition(2001).collect().length
+      assert(count === 3000)
+    } finally {
+      sc.stop()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org