You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/01 13:01:25 UTC

spark git commit: [SPARK-23289][CORE] OneForOneBlockFetcher.DownloadCallback.onData should write the buffer fully

Repository: spark
Updated Branches:
  refs/heads/master ffbca8451 -> ec63e2d07


[SPARK-23289][CORE] OneForOneBlockFetcher.DownloadCallback.onData should write the buffer fully

## What changes were proposed in this pull request?

`channel.write(buf)` may not write the whole buffer since the underlying channel is a FileChannel, we should retry until the whole buffer is written.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zs...@gmail.com>

Closes #20461 from zsxwing/SPARK-23289.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec63e2d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec63e2d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec63e2d0

Branch: refs/heads/master
Commit: ec63e2d0743a4f75e1cce21d0fe2b54407a86a4a
Parents: ffbca84
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu Feb 1 21:00:47 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 21:00:47 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/network/shuffle/OneForOneBlockFetcher.java | 4 +++-
 core/src/test/scala/org/apache/spark/FileSuite.scala            | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec63e2d0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 9cac7d0..0bc5718 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -171,7 +171,9 @@ public class OneForOneBlockFetcher {
 
     @Override
     public void onData(String streamId, ByteBuffer buf) throws IOException {
-      channel.write(buf);
+      while (buf.hasRemaining()) {
+        channel.write(buf);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/ec63e2d0/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index e9539dc..55a9122 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -244,7 +244,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     for (i <- 0 until testOutputCopies) {
       // Shift values by i so that they're different in the output
       val alteredOutput = testOutput.map(b => (b + i).toByte)
-      channel.write(ByteBuffer.wrap(alteredOutput))
+      val buffer = ByteBuffer.wrap(alteredOutput)
+      while (buffer.hasRemaining) {
+        channel.write(buffer)
+      }
     }
     channel.close()
     file.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org