You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/13 04:40:50 UTC

spark git commit: [SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once

Repository: spark
Updated Branches:
  refs/heads/master 21cb59f1c -> edeb51a39


[SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once

## What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

```
The safer way is to write to an output stream so that we don't have to materialize a huge string.

## How was this patch tested?

Existing unit tests

Author: Burak Yavuz <br...@gmail.com>

Closes #15437 from brkyvz/ser-to-stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edeb51a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edeb51a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edeb51a3

Branch: refs/heads/master
Commit: edeb51a39d76d64196d7635f52be1b42c7ec4341
Parents: 21cb59f
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Oct 12 21:40:45 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Oct 12 21:40:45 2016 -0700

----------------------------------------------------------------------
 .../streaming/CompactibleFileStreamLog.scala    | 22 +++++++++------
 .../execution/streaming/HDFSMetadataLog.scala   | 29 ++++++++++----------
 .../streaming/FileStreamSinkLogSuite.scala      | 14 ++++++----
 3 files changed, 38 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 027b5bb..c14feea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.IOException
+import java.io.{InputStream, IOException, OutputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 
+import scala.io.{Source => IOSource}
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.fs.{Path, PathFilter}
@@ -93,20 +94,25 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
     }
   }
 
-  override def serialize(logData: Array[T]): Array[Byte] = {
-    (metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
+  override def serialize(logData: Array[T], out: OutputStream): Unit = {
+    // called inside a try-finally where the underlying stream is closed in the caller
+    out.write(metadataLogVersion.getBytes(UTF_8))
+    logData.foreach { data =>
+      out.write('\n')
+      out.write(serializeData(data).getBytes(UTF_8))
+    }
   }
 
-  override def deserialize(bytes: Array[Byte]): Array[T] = {
-    val lines = new String(bytes, UTF_8).split("\n")
-    if (lines.length == 0) {
+  override def deserialize(in: InputStream): Array[T] = {
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+    if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file")
     }
-    val version = lines(0)
+    val version = lines.next()
     if (version != metadataLogVersion) {
       throw new IllegalStateException(s"Unknown log version: ${version}")
     }
-    lines.slice(1, lines.length).map(deserializeData)
+    lines.map(deserializeData).toArray
   }
 
   override def add(batchId: Long, logs: Array[T]): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 39a0f33..c7235320 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{FileNotFoundException, IOException}
-import java.nio.ByteBuffer
+import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
 import java.util.{ConcurrentModificationException, EnumSet, UUID}
 
 import scala.reflect.ClassTag
@@ -29,7 +28,6 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.UninterruptibleThread
@@ -88,12 +86,16 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
     }
   }
 
-  protected def serialize(metadata: T): Array[Byte] = {
-    JavaUtils.bufferToArray(serializer.serialize(metadata))
+  protected def serialize(metadata: T, out: OutputStream): Unit = {
+    // called inside a try-finally where the underlying stream is closed in the caller
+    val outStream = serializer.serializeStream(out)
+    outStream.writeObject(metadata)
   }
 
-  protected def deserialize(bytes: Array[Byte]): T = {
-    serializer.deserialize[T](ByteBuffer.wrap(bytes))
+  protected def deserialize(in: InputStream): T = {
+    // called inside a try-finally where the underlying stream is closed in the caller
+    val inStream = serializer.deserializeStream(in)
+    inStream.readObject[T]()
   }
 
   /**
@@ -114,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
       // Only write metadata when the batch has not yet been written
       Thread.currentThread match {
         case ut: UninterruptibleThread =>
-          ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) }
+          ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
         case _ =>
           throw new IllegalStateException(
             "HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
@@ -129,7 +131,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
    * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
    * valid behavior, we still need to prevent it from destroying the files.
    */
-  private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = {
+  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
     // Use nextId to create a temp file
     var nextId = 0
     while (true) {
@@ -137,9 +139,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
       try {
         val output = fileManager.create(tempPath)
         try {
-          output.write(bytes)
+          writer(metadata, output)
         } finally {
-          output.close()
+          IOUtils.closeQuietly(output)
         }
         try {
           // Try to commit the batch
@@ -193,10 +195,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
     if (fileManager.exists(batchMetadataFile)) {
       val input = fileManager.open(batchMetadataFile)
       try {
-        val bytes = IOUtils.toByteArray(input)
-        Some(deserialize(bytes))
+        Some(deserialize(input))
       } finally {
-        input.close()
+        IOUtils.closeQuietly(input)
       }
     } else {
       logDebug(s"Unable to find batch $batchMetadataFile")

http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 41a8cc2..e1bc674 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 
 import org.apache.spark.SparkFunSuite
@@ -133,9 +134,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
           |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
           |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
       // scalastyle:on
-      assert(expected === new String(sinkLog.serialize(logs), UTF_8))
-
-      assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8))
+      val baos = new ByteArrayOutputStream()
+      sinkLog.serialize(logs, baos)
+      assert(expected === baos.toString(UTF_8.name()))
+      baos.reset()
+      sinkLog.serialize(Array(), baos)
+      assert(VERSION === baos.toString(UTF_8.name()))
     }
   }
 
@@ -174,9 +178,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
           blockSize = 30000L,
           action = FileStreamSinkLog.ADD_ACTION))
 
-      assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
+      assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
 
-      assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8)))
+      assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org