You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/01 18:33:55 UTC
spark git commit: [SPARK-18617][BACKPORT] Follow up PR to Close "kryo
auto pick" feature for Spark Streaming
Repository: spark
Updated Branches:
refs/heads/branch-2.0 5ecd3c23a -> 6e3fd2b98
[SPARK-18617][BACKPORT] Follow up PR to Close "kryo auto pick" feature for Spark Streaming
## What changes were proposed in this pull request?
This is a follow-up PR to backport #16052 to branch-2.0 with incremental update in #16091
## How was this patch tested?
new unit test
cc zsxwing rxin
Author: uncleGen <hu...@gmail.com>
Closes #16096 from uncleGen/branch-2.0-SPARK-18617.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3fd2b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3fd2b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3fd2b9
Branch: refs/heads/branch-2.0
Commit: 6e3fd2b981e36f7f474781f22e606111d6ad13d5
Parents: 5ecd3c2
Author: uncleGen <hu...@gmail.com>
Authored: Thu Dec 1 10:33:51 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Dec 1 10:33:51 2016 -0800
----------------------------------------------------------------------
.../spark/serializer/SerializerManager.scala | 16 +++++++----
.../spark/storage/memory/MemoryStore.scala | 5 ++--
.../storage/PartiallySerializedBlockSuite.scala | 6 ++--
.../spark/streaming/StreamingContextSuite.scala | 29 ++++++++++++++++++++
4 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 59bdc88..76b985c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -72,8 +72,11 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}
- def getSerializer(ct: ClassTag[_]): Serializer = {
- if (canUseKryo(ct)) {
+ // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
+ // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
+ // a rational choice to close `kryo auto pick` feature for streaming in the first step.
+ def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
+ if (autoPick && canUseKryo(ct)) {
kryoSerializer
} else {
defaultSerializer
@@ -122,7 +125,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
@@ -138,7 +142,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
classTag: ClassTag[_]): ChunkedByteBuffer = {
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
val byteStream = new BufferedOutputStream(bbos)
- val ser = getSerializer(classTag).newInstance()
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
bbos.toChunkedByteBuffer
}
@@ -152,7 +157,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
inputStream: InputStream)
(classTag: ClassTag[T]): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
- getSerializer(classTag)
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ getSerializer(classTag, autoPick)
.newInstance()
.deserializeStream(wrapForCompression(blockId, stream))
.asIterator.asInstanceOf[Iterator[T]]
http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 9b87c42..68dff85 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
-import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
@@ -332,7 +332,8 @@ private[spark] class MemoryStore(
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
- val ser = serializerManager.getSerializer(classTag).newInstance()
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index ec4f263..3050f9a 100644
--- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
spy
}
- val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+ val serializer = serializerManager
+ .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
redirectableOutputStream.setOutputStream(bbos)
val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
@@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
Mockito.verifyNoMoreInteractions(memoryStore)
Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose()
- val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+ val serializer = serializerManager
+ .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val deserialized =
serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
assert(deserialized === items)
http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f1482e5..35eeb9d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@@ -806,6 +807,34 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc.stop()
}
+ test("SPARK-18560 Receiver data should be deserialized properly.") {
+ // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
+ // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
+ val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
+ ssc = new StreamingContext(conf, Milliseconds(100))
+ val input = ssc.receiverStream(new TestReceiver)
+ val latch = new CountDownLatch(1)
+ input.count().foreachRDD { rdd =>
+ // Make sure we can read from BlockRDD
+ if (rdd.collect().headOption.getOrElse(0L) > 0) {
+ // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+ new Thread() {
+ setDaemon(true)
+ override def run(): Unit = {
+ ssc.stop(stopSparkContext = true, stopGracefully = false)
+ latch.countDown()
+ }
+ }.start()
+ }
+ }
+ ssc.start()
+ ssc.awaitTerminationOrTimeout(60000)
+ // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active
+ // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test
+ // is running.
+ assert(latch.await(60, TimeUnit.SECONDS))
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org