You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/13 04:40:50 UTC
spark git commit: [SPARK-17876] Write StructuredStreaming WAL to a
stream instead of materializing all at once
Repository: spark
Updated Branches:
refs/heads/master 21cb59f1c -> edeb51a39
[SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once
## What changes were proposed in this pull request?
The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)
```
The safer way is to write to an output stream so that we don't have to materialize a huge string.
## How was this patch tested?
Existing unit tests
Author: Burak Yavuz <br...@gmail.com>
Closes #15437 from brkyvz/ser-to-stream.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edeb51a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edeb51a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edeb51a3
Branch: refs/heads/master
Commit: edeb51a39d76d64196d7635f52be1b42c7ec4341
Parents: 21cb59f
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Oct 12 21:40:45 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Oct 12 21:40:45 2016 -0700
----------------------------------------------------------------------
.../streaming/CompactibleFileStreamLog.scala | 22 +++++++++------
.../execution/streaming/HDFSMetadataLog.scala | 29 ++++++++++----------
.../streaming/FileStreamSinkLogSuite.scala | 14 ++++++----
3 files changed, 38 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 027b5bb..c14feea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.execution.streaming
-import java.io.IOException
+import java.io.{InputStream, IOException, OutputStream}
import java.nio.charset.StandardCharsets.UTF_8
+import scala.io.{Source => IOSource}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{Path, PathFilter}
@@ -93,20 +94,25 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
}
}
- override def serialize(logData: Array[T]): Array[Byte] = {
- (metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
+ override def serialize(logData: Array[T], out: OutputStream): Unit = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ out.write(metadataLogVersion.getBytes(UTF_8))
+ logData.foreach { data =>
+ out.write('\n')
+ out.write(serializeData(data).getBytes(UTF_8))
+ }
}
- override def deserialize(bytes: Array[Byte]): Array[T] = {
- val lines = new String(bytes, UTF_8).split("\n")
- if (lines.length == 0) {
+ override def deserialize(in: InputStream): Array[T] = {
+ val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+ if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
- val version = lines(0)
+ val version = lines.next()
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
- lines.slice(1, lines.length).map(deserializeData)
+ lines.map(deserializeData).toArray
}
override def add(batchId: Long, logs: Array[T]): Boolean = {
http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 39a0f33..c7235320 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.execution.streaming
-import java.io.{FileNotFoundException, IOException}
-import java.nio.ByteBuffer
+import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
import java.util.{ConcurrentModificationException, EnumSet, UUID}
import scala.reflect.ClassTag
@@ -29,7 +28,6 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread
@@ -88,12 +86,16 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
}
}
- protected def serialize(metadata: T): Array[Byte] = {
- JavaUtils.bufferToArray(serializer.serialize(metadata))
+ protected def serialize(metadata: T, out: OutputStream): Unit = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ val outStream = serializer.serializeStream(out)
+ outStream.writeObject(metadata)
}
- protected def deserialize(bytes: Array[Byte]): T = {
- serializer.deserialize[T](ByteBuffer.wrap(bytes))
+ protected def deserialize(in: InputStream): T = {
+ // called inside a try-finally where the underlying stream is closed in the caller
+ val inStream = serializer.deserializeStream(in)
+ inStream.readObject[T]()
}
/**
@@ -114,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
// Only write metadata when the batch has not yet been written
Thread.currentThread match {
case ut: UninterruptibleThread =>
- ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) }
+ ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
@@ -129,7 +131,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
- private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = {
+ private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
// Use nextId to create a temp file
var nextId = 0
while (true) {
@@ -137,9 +139,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
try {
val output = fileManager.create(tempPath)
try {
- output.write(bytes)
+ writer(metadata, output)
} finally {
- output.close()
+ IOUtils.closeQuietly(output)
}
try {
// Try to commit the batch
@@ -193,10 +195,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
try {
- val bytes = IOUtils.toByteArray(input)
- Some(deserialize(bytes))
+ Some(deserialize(input))
} finally {
- input.close()
+ IOUtils.closeQuietly(input)
}
} else {
logDebug(s"Unable to find batch $batchMetadataFile")
http://git-wip-us.apache.org/repos/asf/spark/blob/edeb51a3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 41a8cc2..e1bc674 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8
import org.apache.spark.SparkFunSuite
@@ -133,9 +134,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
// scalastyle:on
- assert(expected === new String(sinkLog.serialize(logs), UTF_8))
-
- assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8))
+ val baos = new ByteArrayOutputStream()
+ sinkLog.serialize(logs, baos)
+ assert(expected === baos.toString(UTF_8.name()))
+ baos.reset()
+ sinkLog.serialize(Array(), baos)
+ assert(VERSION === baos.toString(UTF_8.name()))
}
}
@@ -174,9 +178,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
blockSize = 30000L,
action = FileStreamSinkLog.ADD_ACTION))
- assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
+ assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
- assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8)))
+ assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org