You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/20 04:12:01 UTC

[GitHub] [spark] sadikovi commented on a diff in pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB

sadikovi commented on code in PR #38064:
URL: https://github.com/apache/spark/pull/38064#discussion_r1000114683


##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {
+
+  /**
+   * @param estimatedSize estimated size of `t`, used as a hint to choose proper chunk size
+   */
+  def serializeToChunkedBuffer[T: ClassTag](
+      serializerInstance: SerializerInstance,
+      t: T,

Review Comment:
   nit: could you have a bit more descriptive name? Thanks.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -42,8 +42,9 @@ import org.apache.spark.util.Utils
  *               buffers may also be used elsewhere then the caller is responsible for copying
  *               them as needed.
  */
-private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) extends Externalizable {

Review Comment:
   Can you confirm that these are copies of the byte buffers so there is no other reference that can be modified outside of the ChunkedByteBuffer based on your changes?



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,57 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to ObjectOutput with zero copy if possible

Review Comment:
   nit: `Writes to the provided ObjectOutput with zero copy if possible.`



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -207,6 +267,18 @@ private[spark] object ChunkedByteBuffer {
     }
     out.toChunkedByteBuffer
   }
+
+  /**
+   * Try to estimate appropriate chunk size so that it's not too large(waste memory) or too
+   * small(too many segments)

Review Comment:
   nit: space before (.



##########
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala:
##########
@@ -69,6 +78,45 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
     }
   }
 
+  test("Externalizable: writeExternal() and readExternal()") {
+    // intentionally generate arrays of different len, in order to verify the chunks layout
+    // is preserved after ser/deser
+    val byteArrays = (1 to 15).map(i => (0 until i).map(_.toByte).toArray)
+    val chunkedByteBuffer = new ChunkedByteBuffer(byteArrays.map(ByteBuffer.wrap).toArray)
+    val baos = new ByteArrayOutputStream()
+    val objOut = new ObjectOutputStream(baos)
+    chunkedByteBuffer.writeExternal(objOut)
+    objOut.close()
+    assert(chunkedByteBuffer.chunks.forall(_.position() == 0))
+
+    val chunkedByteBuffer2 = {
+      val tmp = new ChunkedByteBuffer
+      tmp.readExternal(new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)))
+      tmp
+    }
+    assertBufferEqual(chunkedByteBuffer, chunkedByteBuffer2)
+  }
+
+  test(
+    "Externalizable: writeExternal() and readExternal() should handle off-heap buffer properly") {

Review Comment:
   nit:
   ```
     ....
   ) {
     val ...
   ```



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,57 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to ObjectOutput with zero copy if possible

Review Comment:
   Also, could you elaborate on zero copy? It seems a copy is required regardless.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,57 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to ObjectOutput with zero copy if possible
+   */
+  override def writeExternal(out: ObjectOutput): Unit = {
+    // we want to keep the chunks layout
+    out.writeInt(chunks.length)
+    val chunksCopy = getChunks()
+    chunksCopy.foreach(buffer => out.writeInt(buffer.limit()))
+    var buffer: Array[Byte] = null
+    val bufferLen = ChunkedByteBuffer.estimateBufferChunkSize(size)
+
+    chunksCopy.foreach { chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by bytes array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())

Review Comment:
   I believe this would still incur a copy, unless the implementation stores the reference and position.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -56,7 +57,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   /**
    * This size of this buffer, in bytes.
    */
-  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
+  private var _size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum

Review Comment:
   Is this needed for serialisation? Can you add a comment to explain why this is `var`? Thanks.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,57 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to ObjectOutput with zero copy if possible
+   */
+  override def writeExternal(out: ObjectOutput): Unit = {
+    // we want to keep the chunks layout

Review Comment:
   nit: We.... Can you uppercase other comments in the method as well?



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -84,6 +91,57 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
     }
   }
 
+  /**
+   * write to ObjectOutput with zero copy if possible
+   */
+  override def writeExternal(out: ObjectOutput): Unit = {
+    // we want to keep the chunks layout
+    out.writeInt(chunks.length)
+    val chunksCopy = getChunks()
+    chunksCopy.foreach(buffer => out.writeInt(buffer.limit()))
+    var buffer: Array[Byte] = null
+    val bufferLen = ChunkedByteBuffer.estimateBufferChunkSize(size)
+
+    chunksCopy.foreach { chunk => {
+      if (chunk.hasArray) {
+        // zero copy if the bytebuffer is backed by bytes array
+        out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
+      } else {
+        // fallback to copy approach
+        if (buffer == null) {
+          buffer = new Array[Byte](bufferLen)
+        }
+        var bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        while (bytesToRead > 0) {
+          chunk.get(buffer, 0, bytesToRead)
+          out.write(buffer, 0, bytesToRead)
+          bytesToRead = Math.min(chunk.remaining(), bufferLen)
+        }
+      }
+    }}
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    val chunksNum = in.readInt()
+    val indices = 0 until chunksNum
+    val chunksSize = indices.map(_ => in.readInt())
+    val chunks = new Array[ByteBuffer](chunksNum)
+
+    // We deserialize all chunks into on-heap buffer by default. If we have use case in the future
+    // where we want to preserve the on-heap/off-heap nature of chunks, then we need to record the
+    // `isDirect` property of each chunk during serialization
+    indices.foreach { i => {

Review Comment:
   nit: you don't need the second `{`.



##########
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala:
##########
@@ -28,6 +29,14 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 
 class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
 
+  def assertBufferEqual(buffer1: ChunkedByteBuffer, buffer2: ChunkedByteBuffer): Unit = {
+    assert(buffer1.chunks.length == buffer2.chunks.length)
+    assert(buffer1.chunks.zip(buffer2.chunks).forall{

Review Comment:
   nit: space before {.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -207,6 +267,18 @@ private[spark] object ChunkedByteBuffer {
     }
     out.toChunkedByteBuffer
   }
+
+  /**
+   * Try to estimate appropriate chunk size so that it's not too large(waste memory) or too

Review Comment:
   nit: space before (.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -207,6 +267,18 @@ private[spark] object ChunkedByteBuffer {
     }
     out.toChunkedByteBuffer
   }
+
+  /**
+   * Try to estimate appropriate chunk size so that it's not too large(waste memory) or too
+   * small(too many segments)
+   */
+  def estimateBufferChunkSize(estimatedSize: Long = -1): Int = {
+    if (estimatedSize < 0) {
+      CHUNK_BUFFER_SIZE
+    } else {
+      Math.max(Math.min(estimatedSize >> 3, CHUNK_BUFFER_SIZE).toInt, MINIMUM_CHUNK_BUFFER_SIZE)

Review Comment:
   +1



##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {

Review Comment:
   I was thinking if this is specific to the chunked byte buffer, would it be beneficial to add this to ChunkedByteBuffer as a static method, e.g. `ChunkedByteBuffer.serialize(...)`? Just a question to see if there is a utility in the separate class.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org