You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/01/13 23:30:13 UTC

[2/3] drill git commit: DRILL-1926 Fix for back pressure logic

DRILL-1926 Fix for back pressure logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2a410775
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2a410775
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2a410775

Branch: refs/heads/master
Commit: 2a4107753870aa408447fe6fb0becabcbb0691ef
Parents: a2190fa
Author: Yuliya Feldman <yf...@maprtech.com>
Authored: Wed Dec 17 14:58:43 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 13 14:29:20 2015 -0800

----------------------------------------------------------------------
 .../exec/work/batch/ResponseSenderQueue.java    |  17 ++-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  21 ++-
 .../work/batch/TestUnlimitedBatchBuffer.java    | 134 +++++++++++++++++++
 3 files changed, 164 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index 5a9316e..a7535c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -34,12 +34,25 @@ public class ResponseSenderQueue {
   }
 
   public void flushResponses(){
-    while(!q.isEmpty()){
+    flushResponses(Integer.MAX_VALUE);
+  }
+
+  /**
+   * Flush only up to a count responses
+   * @param count
+   * @return
+   */
+  public int flushResponses(int count){
+    logger.debug("queue.size: {}, count: {}", q.size(), count);
+    int i = 0;
+    while(!q.isEmpty() && i < count){
       ResponseSender s = q.poll();
       if(s != null){
         s.send(DataRpcConfig.OK);
       }
+      i++;
     }
-
+    return i;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 623a719..35aec93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -41,6 +41,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private volatile BufferState state = BufferState.INIT;
   private final int softlimit;
   private final int startlimit;
+  private final int bufferSizePerSocket;
   private final AtomicBoolean overlimit = new AtomicBoolean(false);
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
@@ -49,10 +50,11 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private FragmentContext context;
 
   public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
-    int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+    bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
 
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
+    logger.debug("softLimit: {}, startLimit: {}", softlimit, startlimit);
     this.buffer = Queues.newLinkedBlockingDeque();
     this.fragmentCount = fragmentCount;
     this.streamCounter = fragmentCount;
@@ -82,7 +84,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       return;
     }
     buffer.add(batch);
-    if (buffer.size() == softlimit) {
+    if (buffer.size() >= softlimit) {
+      logger.debug("buffer.size: {}", buffer.size());
       overlimit.set(true);
       readController.enqueueResponse(batch.getSender());
     } else {
@@ -167,11 +170,17 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     }
 
 
-    // if we are in the overlimit condition and aren't finished, check if we've passed the start limit.  If so, turn off the overlimit condition and set auto read to true (start reading from socket again).
+    // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
+    // when queue size is lower then softlimit - the bigger the difference the more we can flush
     if (!isFinished() && overlimit.get()) {
-      if (buffer.size() == startlimit) {
-        overlimit.set(false);
-        readController.flushResponses();
+      int flushCount = softlimit - buffer.size();
+      if ( flushCount > 0 ) {
+        int flushed = readController.flushResponses(flushCount);
+        logger.debug("flush {} entries, flushed {} entries ", flushCount, flushed);
+        if ( flushed == 0 ) {
+          // queue is empty - nothing to do for now
+          overlimit.set(false);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
new file mode 100644
index 0000000..15ee3f3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test case to test whether backpressure is applied when
+ * size of the queue of RawBatchBuffers is exceeding specified softLimit.
+ * It is testing that acknowledgments are queued and sent according to the
+ * correct schedule
+ * If algorithm to release acks will be changed in the future
+ * this test will need to be changed
+ * It is not testing whether Senders receive acknowledgments and act accordingly
+ */
+public class TestUnlimitedBatchBuffer extends ExecTest {
+
+  private static int FRAGMENT_COUNT = 5;
+  private DrillConfig dc = DrillConfig.create();
+
+  private static class MySender implements ResponseSender {
+
+    private int sendCount = 0;
+
+    @Override
+    public void send(Response r) {
+      sendCount++;
+    }
+
+
+    public int getSendCount() {
+      return sendCount;
+    }
+
+    public void resetSender() {
+      sendCount = 0;
+    }
+  }
+  @Test
+  public void testBackPressure() throws Exception {
+
+    final MySender mySender = new MySender();
+    FragmentContext context = Mockito.mock(FragmentContext.class);
+
+    Mockito.when(context.getConfig()).thenReturn(dc);
+
+    UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
+
+    RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class);
+
+    Mockito.when(batch.getSender()).thenReturn(mySender);
+    Mockito.doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock ignore) throws Throwable {
+        mySender.send(DataRpcConfig.OK);
+        return null;
+      }
+    }).when(batch).sendOk();
+
+    FragmentRecordBatch header = FragmentRecordBatch.newBuilder().setIsOutOfMemory(false).setIsLastBatch(false).build();
+    Mockito.when(batch.getHeader()).thenReturn(header);
+
+    /// start the real test
+    int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+    int softLimit = incomingBufferSize * FRAGMENT_COUNT;
+
+    // No back pressure should be kicked in
+    for ( int i = 0; i < softLimit-1; i++) {
+      rawBuffer.enqueue(batch);
+    }
+
+    // number of responses sent == number of enqueued elements
+    assertEquals(softLimit - 1, mySender.getSendCount());
+    rawBuffer.getNext();
+
+    // set senderCount to 0
+    mySender.resetSender();
+
+    // test back pressure
+    // number of elements in the queue = softLimit -2
+    // enqueue softlimit elements more
+    for ( int i = 0; i < softLimit; i++) {
+      rawBuffer.enqueue(batch);
+    }
+    // we are exceeding softlimit, so senderCount should not increase
+    assertEquals(1, mySender.getSendCount());
+
+    // other responses should be saved in the responsequeue
+    for (int i = 0; i < softLimit-2; i++ ) {
+      rawBuffer.getNext();
+    }
+
+    // still should not send responses, as queue.size should higher then softLimit
+    assertEquals(1, mySender.getSendCount());
+
+    // size of the queue == softLimit now
+    for (int i = softLimit; i > 0 ; i-- ) {
+      int senderCount = mySender.getSendCount();
+      rawBuffer.getNext();
+      int expectedCountNumber = softLimit - i + senderCount+1;
+      assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit), mySender.getSendCount());
+    }
+  }
+
+}