You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/08/07 16:43:34 UTC

[asterixdb] branch master updated: [NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 32eed5f  [NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic
32eed5f is described below

commit 32eed5f384c5851eae1c613fcb3b9532744ed595
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Tue Aug 6 11:55:20 2019 -0700

    [NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - To avoid synchronization issues that might occur
      due to JVM reordering, ensure that both recycling
      read buffers and notifying the sender  of their
      availability are done atomically before the next
      buffer is received from the sender.
    
    Change-Id: Ia3b1920f33bf7d4e7efbd2ea3405cbc4310a78c7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3520
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../muxdemux/FullFrameChannelReadInterface.java    | 89 ++++++++++++----------
 1 file changed, 47 insertions(+), 42 deletions(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 03ceb96..5a23fb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -20,8 +20,8 @@ package org.apache.hyracks.net.protocols.muxdemux;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,65 +33,70 @@ import org.apache.logging.log4j.Logger;
 public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final BlockingDeque<ByteBuffer> riEmptyStack;
+    private final Deque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
+    private final Object bufferRecycleLock = new Object();
 
     public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new LinkedBlockingDeque<>();
+        riEmptyStack = new ArrayDeque<>();
         credits = 0;
 
         emptyBufferAcceptor = buffer -> {
-            if (ccb.isRemotelyClosed()) {
-                return;
-            }
             final int delta = buffer.remaining();
-            riEmptyStack.push(buffer);
-            ccb.addPendingCredits(delta);
+            synchronized (bufferRecycleLock) {
+                if (ccb.isRemotelyClosed()) {
+                    return;
+                }
+                riEmptyStack.push(buffer);
+                ccb.addPendingCredits(delta);
+            }
         };
     }
 
     @Override
     public int read(ISocketChannel sc, int size) throws IOException, NetException {
-        while (true) {
-            if (size <= 0) {
-                return size;
-            }
-            if (currentReadBuffer == null) {
-                currentReadBuffer = riEmptyStack.poll();
-                //if current buffer == null and limit not reached
-                // factory.createBuffer factory
-                if (currentReadBuffer == null) {
-                    currentReadBuffer = bufferFactory.createBuffer();
+        synchronized (bufferRecycleLock) {
+            while (true) {
+                if (size <= 0) {
+                    return size;
                 }
-            }
-            if (currentReadBuffer == null) {
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
+                if (currentReadBuffer == null) {
+                    currentReadBuffer = riEmptyStack.poll();
+                    //if current buffer == null and limit not reached
+                    // factory.createBuffer factory
+                    if (currentReadBuffer == null) {
+                        currentReadBuffer = bufferFactory.createBuffer();
+                    }
                 }
-                throw new IllegalStateException(ccb + " read buffers exceeded");
-            }
-            int rSize = Math.min(size, currentReadBuffer.remaining());
-            if (rSize > 0) {
-                currentReadBuffer.limit(currentReadBuffer.position() + rSize);
-                int len;
-                try {
-                    len = sc.read(currentReadBuffer);
-                    if (len < 0) {
-                        throw new NetException("Socket Closed");
+                if (currentReadBuffer == null) {
+                    if (LOGGER.isWarnEnabled()) {
+                        LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
                     }
-                } finally {
-                    currentReadBuffer.limit(currentReadBuffer.capacity());
+                    throw new IllegalStateException(ccb + " read buffers exceeded");
                 }
-                size -= len;
-                if (len < rSize) {
+                int rSize = Math.min(size, currentReadBuffer.remaining());
+                if (rSize > 0) {
+                    currentReadBuffer.limit(currentReadBuffer.position() + rSize);
+                    int len;
+                    try {
+                        len = sc.read(currentReadBuffer);
+                        if (len < 0) {
+                            throw new NetException("Socket Closed");
+                        }
+                    } finally {
+                        currentReadBuffer.limit(currentReadBuffer.capacity());
+                    }
+                    size -= len;
+                    if (len < rSize) {
+                        return size;
+                    }
+                } else {
                     return size;
                 }
-            } else {
-                return size;
-            }
-            if (currentReadBuffer.remaining() <= 0) {
-                flush();
+                if (currentReadBuffer.remaining() <= 0) {
+                    flush();
+                }
             }
         }
     }