You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/10 08:58:03 UTC

[GitHub] [spark] Ngone51 commented on a diff in pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB

Ngone51 commented on code in PR #38064:
URL: https://github.com/apache/spark/pull/38064#discussion_r991002266


##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {
+  def serializeToChunkedBuffer[T: ClassTag](
+    serializerInstance: SerializerInstance,
+    t: T): ChunkedByteBuffer = {
+    val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
+    val out = serializerInstance.serializeStream(cbbos)
+    out.writeObject(t)
+    out.close()
+    cbbos.close()
+    cbbos.toChunkedByteBuffer
+  }

Review Comment:
   leave a blank line after this?



##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -659,25 +660,27 @@ private[spark] class Executor(
         val accumUpdates = task.collectAccumulatorUpdates()
         val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
         // TODO: do not serialize value twice
-        val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)
-        val serializedDirectResult = ser.serialize(directResult)
-        val resultSize = serializedDirectResult.limit()
+        val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
+        val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult)
+        val resultSize = serializedDirectResult.size
 
         // directSend = sending directly back to the driver
-        val serializedResult: ByteBuffer = {
+        val serializedResult: ChunkedByteBuffer = {
           if (maxResultSize > 0 && resultSize > maxResultSize) {
             logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
               s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
               s"dropping it.")
-            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
+            SerializerHelper.serializeToChunkedBuffer(ser,
+              new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))

Review Comment:
   The object `IndirectTaskResult` should be small. Do we also want to use chunked buffer for it? 



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -42,8 +42,9 @@ import org.apache.spark.util.Utils
  *               buffers may also be used elsewhere then the caller is responsible for copying
  *               them as needed.
  */
-private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) extends Externalizable {
   require(chunks != null, "chunks must not be null")
+  require(!chunks.contains(null), "chunks must not be null")

Review Comment:
   ```suggestion
     require(!chunks.contains(null), "chunks must not contain null")
   ```



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,104 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to stream with zero copy if possible
+   */
+  def writeToStream(out: OutputStream): Unit = {
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
+      } else {
+        // fallback to copy approach
+        if (buffer == null) {
+          buffer = new Array[Byte](bufferLen)
+        }
+        val originalPos = chunk.position()
+        chunk.rewind()
+        var bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        while (bytesToRead > 0) {
+          chunk.get(buffer, 0, bytesToRead)
+          out.write(buffer, 0, bytesToRead)
+          bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        }
+        chunk.position(originalPos)

Review Comment:
   Shall we extract this common code block to a function?



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,104 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to stream with zero copy if possible
+   */
+  def writeToStream(out: OutputStream): Unit = {
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
+      } else {
+        // fallback to copy approach
+        if (buffer == null) {
+          buffer = new Array[Byte](bufferLen)
+        }
+        val originalPos = chunk.position()
+        chunk.rewind()
+        var bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        while (bytesToRead > 0) {
+          chunk.get(buffer, 0, bytesToRead)
+          out.write(buffer, 0, bytesToRead)
+          bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        }
+        chunk.position(originalPos)
+      }
+    })
+  }
+
+  /**
+   * write to ObjectOutput with zero copy if possible
+   */
+  override def writeExternal(out: ObjectOutput): Unit = {
+    // we want to keep the chunks layout
+    out.writeInt(chunks.length)
+    chunks.foreach(buffer => out.writeInt(buffer.limit()))
+    chunks.foreach(buffer => out.writeBoolean(buffer.isDirect))
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
+      } else {
+        // fallback to copy approach
+        if (buffer == null) {
+          buffer = new Array[Byte](bufferLen)
+        }
+        val originalPos = chunk.position()
+        chunk.rewind()
+        var bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        while (bytesToRead > 0) {
+          chunk.get(buffer, 0, bytesToRead)
+          out.write(buffer, 0, bytesToRead)
+          bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        }
+        chunk.position(originalPos)
+      }
+    })
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    val chunksNum = in.readInt()
+    val indices = 0 until chunksNum
+    val chunksSize = indices.map(_ => in.readInt())
+    val chunksDirect = indices.map(_ => in.readBoolean())
+    val chunks = new Array[ByteBuffer](chunksNum)
+
+    val copyBufferLen = 1024 * 1024
+    val copyBuffer: Array[Byte] = if (chunksDirect.exists(identity)) {
+      new Array[Byte](copyBufferLen)
+    } else {
+      null
+    }
+
+    indices.foreach(i => {

Review Comment:
   ```scala
   indices.foreach { i =>
     ...
   }
   ```



##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {
+  def serializeToChunkedBuffer[T: ClassTag](
+    serializerInstance: SerializerInstance,
+    t: T): ChunkedByteBuffer = {
+    val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
+    val out = serializerInstance.serializeStream(cbbos)
+    out.writeObject(t)
+    out.close()
+    cbbos.close()
+    cbbos.toChunkedByteBuffer
+  }
+  def deserializeFromChunkedBuffer[T: ClassTag](
+    serializerInstance: SerializerInstance,
+    bytes: ChunkedByteBuffer): T = {

Review Comment:
   4 indents



##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -659,25 +660,27 @@ private[spark] class Executor(
         val accumUpdates = task.collectAccumulatorUpdates()
         val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
         // TODO: do not serialize value twice
-        val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)
-        val serializedDirectResult = ser.serialize(directResult)
-        val resultSize = serializedDirectResult.limit()
+        val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
+        val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult)
+        val resultSize = serializedDirectResult.size
 
         // directSend = sending directly back to the driver
-        val serializedResult: ByteBuffer = {
+        val serializedResult: ChunkedByteBuffer = {
           if (maxResultSize > 0 && resultSize > maxResultSize) {
             logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
               s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
               s"dropping it.")
-            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
+            SerializerHelper.serializeToChunkedBuffer(ser,
+              new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
           } else if (resultSize > maxDirectResultSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
               blockId,
-              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
+              serializedDirectResult,
               StorageLevel.MEMORY_AND_DISK_SER)
             logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)")
-            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
+            SerializerHelper.serializeToChunkedBuffer(ser,
+              new IndirectTaskResult[Any](blockId, resultSize))

Review Comment:
   same here.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala:
##########
@@ -24,44 +24,49 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.metrics.ExecutorMetricType
-import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.serializer.{SerializerHelper, SerializerInstance}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.{AccumulatorV2, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 // Task result. Also contains updates to accumulator variables and executor metric peaks.
 private[spark] sealed trait TaskResult[T]
 
 /** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */
-private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
+private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Long)
   extends TaskResult[T] with Serializable
 
 /** A TaskResult that contains the task's return value, accumulator updates and metric peaks. */
 private[spark] class DirectTaskResult[T](
-    var valueBytes: ByteBuffer,
+  var valueByteBuffer: ChunkedByteBuffer,

Review Comment:
   pls keep the indentation unchanged.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,104 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to stream with zero copy if possible
+   */
+  def writeToStream(out: OutputStream): Unit = {
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {

Review Comment:
   nit:
   ```scala
   chunks.foreach { chunk =>
     ...
   }
   ```



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,104 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to stream with zero copy if possible
+   */
+  def writeToStream(out: OutputStream): Unit = {
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
+      } else {
+        // fallback to copy approach
+        if (buffer == null) {
+          buffer = new Array[Byte](bufferLen)
+        }
+        val originalPos = chunk.position()
+        chunk.rewind()
+        var bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        while (bytesToRead > 0) {
+          chunk.get(buffer, 0, bytesToRead)
+          out.write(buffer, 0, bytesToRead)
+          bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        }
+        chunk.position(originalPos)
+      }
+    })
+  }
+
+  /**
+   * write to ObjectOutput with zero copy if possible
+   */
+  override def writeExternal(out: ObjectOutput): Unit = {
+    // we want to keep the chunks layout
+    out.writeInt(chunks.length)
+    chunks.foreach(buffer => out.writeInt(buffer.limit()))
+    chunks.foreach(buffer => out.writeBoolean(buffer.isDirect))
+    var buffer: Array[Byte] = null
+    val bufferLen = 1024 * 1024;
+    chunks.foreach(chunk => {

Review Comment:
   nit:
   
   ```
   chunks.foreach { chunk =>
     ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org