You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/15 10:54:07 UTC

[GitHub] attilapiros commented on a change in pull request #23602: [SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame

attilapiros commented on a change in pull request #23602: [SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame
URL: https://github.com/apache/spark/pull/23602#discussion_r257186965
 
 

 ##########
 File path: common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
 ##########
 @@ -123,30 +140,54 @@ private long decodeFrameSize() {
 
   private ByteBuf decodeNext() {
     long frameSize = decodeFrameSize();
-    if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
+    if (frameSize == UNKNOWN_FRAME_SIZE) {
       return null;
     }
 
-    // Reset size for next frame.
-    nextFrameSize = UNKNOWN_FRAME_SIZE;
-
-    Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
-    Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
+    if (frameBuf == null) {
+      Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE,
+          "Too large frame: %s", frameSize);
+      Preconditions.checkArgument(frameSize > 0,
+          "Frame length should be positive: %s", frameSize);
+      frameRemainingBytes = (int) frameSize;
 
-    // If the first buffer holds the entire frame, return it.
-    int remaining = (int) frameSize;
-    if (buffers.getFirst().readableBytes() >= remaining) {
-      return nextBufferForFrame(remaining);
+      // If buffers is empty, then return immediately for more input data.
+      if (buffers.isEmpty()) {
+        return null;
+      }
+      // Otherwise, if the first buffer holds the entire frame, we attempt to
+      // build frame with it and return.
+      if (buffers.getFirst().readableBytes() >= frameRemainingBytes) {
+        // Reset buf and size for next frame.
+        frameBuf = null;
+        nextFrameSize = UNKNOWN_FRAME_SIZE;
+        return nextBufferForFrame(frameRemainingBytes);
+      }
+      // Other cases, create a composite buffer to manage all the buffers.
+      frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
     }
 
-    // Otherwise, create a composite buffer.
-    CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
-    while (remaining > 0) {
-      ByteBuf next = nextBufferForFrame(remaining);
-      remaining -= next.readableBytes();
-      frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
+    while (frameRemainingBytes > 0 && !buffers.isEmpty()) {
+      ByteBuf next = nextBufferForFrame(frameRemainingBytes);
+      frameRemainingBytes -= next.readableBytes();
+      frameBuf.addComponent(true, next);
     }
-    assert remaining == 0;
+    // If the delta size of frameBuf exceeds the threshold, then we do consolidation
+    // to reduce memory consumption.
+    if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateThreshold) {
+      int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents;
+      frameBuf.consolidate(consolidatedNumComponents, newNumComponents);
+      consolidatedFrameBufSize = frameBuf.capacity();
+      consolidatedNumComponents = frameBuf.numComponents();
+    }
+    if (frameRemainingBytes > 0) {
+      return null;
+    }
+
+    // Reset buf and size for next frame.
+    ByteBuf frame = frameBuf;
+    frameBuf = null;
+    nextFrameSize = UNKNOWN_FRAME_SIZE;
 
 Review comment:
   You have to reset `consolidatedFrameBufSize` and `consolidatedNumComponents` back to 0 for the next frame buffer. 
   
   Otherwise after a very huge frame all the smaller but still quite huge frames are not consolidated at all.
   And when consolidation starts as a frame which bigger then the maximum up to this then only the components are consolidated which are after the previous maximum.  
    
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org