You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/02 03:36:09 UTC

spark git commit: [SPARK-6578] Small rewrite to make the logic more clear in MessageWithHeader.transferTo.

Repository: spark
Updated Branches:
  refs/heads/master 4815bc212 -> 899ebcb14


[SPARK-6578] Small rewrite to make the logic more clear in MessageWithHeader.transferTo.

Author: Reynold Xin <rx...@databricks.com>

Closes #5319 from rxin/SPARK-6578 and squashes the following commits:

7c62a64 [Reynold Xin] Small rewrite to make the logic more clear in transferTo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/899ebcb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/899ebcb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/899ebcb1

Branch: refs/heads/master
Commit: 899ebcb1448126f40be784ce42e69218e9a1ead7
Parents: 4815bc2
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Apr 1 18:36:06 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 1 18:36:06 2015 -0700

----------------------------------------------------------------------
 .../network/protocol/MessageWithHeader.java     | 43 +++++++++++---------
 1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/899ebcb1/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index 215a851..d686a95 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -21,15 +21,15 @@ import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.FileRegion;
 import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.ReferenceCountUtil;
 
 /**
- * A wrapper message that holds two separate pieces (a header and a body) to avoid
- * copying the body's content.
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
  */
 class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
 
@@ -63,32 +63,36 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
     return totalBytesTransferred;
   }
 
+  /**
+   * This code is more complicated than you would think because we might require multiple
+   * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
+   *
+   * The contract is that the caller will ensure position is properly set to the total number
+   * of bytes transferred so far (i.e. value returned by transfered()).
+   */
   @Override
-  public long transferTo(WritableByteChannel target, long position) throws IOException {
+  public long transferTo(final WritableByteChannel target, final long position) throws IOException {
     Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
-    long written = 0;
-
-    if (position < headerLength) {
-      written += copyByteBuf(header, target);
+    // Bytes written for header in this call.
+    long writtenHeader = 0;
+    if (header.readableBytes() > 0) {
+      writtenHeader = copyByteBuf(header, target);
+      totalBytesTransferred += writtenHeader;
       if (header.readableBytes() > 0) {
-        totalBytesTransferred += written;
-        return written;
+        return writtenHeader;
       }
     }
 
+    // Bytes written for body in this call.
+    long writtenBody = 0;
     if (body instanceof FileRegion) {
-      // Adjust the position. If the write is happening as part of the same call where the header
-      // (or some part of it) is written, `position` will be less than the header size, so we want
-      // to start from position 0 in the FileRegion object. Otherwise, we start from the position
-      // requested by the caller.
-      long bodyPos = position > headerLength ? position - headerLength : 0;
-      written += ((FileRegion)body).transferTo(target, bodyPos);
+      writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
     } else if (body instanceof ByteBuf) {
-      written += copyByteBuf((ByteBuf) body, target);
+      writtenBody = copyByteBuf((ByteBuf) body, target);
     }
+    totalBytesTransferred += writtenBody;
 
-    totalBytesTransferred += written;
-    return written;
+    return writtenHeader + writtenBody;
   }
 
   @Override
@@ -102,5 +106,4 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
     buf.skipBytes(written);
     return written;
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org