You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/09/24 23:21:09 UTC

[drill] 03/04: DRILL-6746: Query can hang when PartitionSender task thread sees a connection failure while sending data batches to remote fragment

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e82f5ef689f1051b6cffc17921462e8e5524629b
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Sun Sep 16 21:01:15 2018 -0700

    DRILL-6746: Query can hang when PartitionSender task thread sees a connection failure while sending data batches to remote fragment
    
    closes #1470
---
 .../drill/exec/work/batch/BaseRawBatchBuffer.java  | 39 ++++++++++++++++++----
 .../exec/work/batch/SpoolingRawBatchBuffer.java    | 17 ++++++----
 .../exec/work/batch/UnlimitedRawBatchBuffer.java   | 10 ++++++
 3 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e..6d77d63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -36,8 +37,9 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
 
   protected interface BufferQueue<T> {
     void addOomBatch(RawFragmentBatch batch);
-    RawFragmentBatch poll() throws IOException;
+    RawFragmentBatch poll() throws IOException, InterruptedException;
     RawFragmentBatch take() throws IOException, InterruptedException;
+    RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException;
     boolean checkForOutOfMemory();
     int size();
     boolean isEmpty();
@@ -127,17 +129,24 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
    * responses pending
    */
   private void clearBufferWithBody() {
+    RawFragmentBatch batch;
     while (!bufferQueue.isEmpty()) {
-      final RawFragmentBatch batch;
+      batch = null;
       try {
         batch = bufferQueue.poll();
         assertAckSent(batch);
       } catch (IOException e) {
         context.getExecutorState().fail(e);
         continue;
-      }
-      if (batch.getBody() != null) {
-        batch.getBody().release();
+      } catch (InterruptedException e) {
+        context.getExecutorState().fail(e);
+        // keep the state that the thread is interrupted
+        Thread.currentThread().interrupt();
+        continue;
+      } finally {
+        if (batch != null && batch.getBody() != null) {
+          batch.getBody().release();
+        }
       }
     }
   }
@@ -167,7 +176,25 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
 
       // if we didn't get a batch, block on waiting for queue.
       if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
-        b = bufferQueue.take();
+        // We shouldn't block infinitely here. There can be a condition such that due to a failure FragmentExecutor
+        // state is changed to FAILED and queue is empty. Because of this the minor fragment main thread will block
+        // here waiting for next batch to arrive. Meanwhile when next batch arrived and was enqueued it sees
+        // FragmentExecutor failure state and doesn't enqueue the batch and cleans up the buffer queue. Hence this
+        // thread will stuck forever. So we pool for 5 seconds until we get a batch or FragmentExecutor state is in
+        // error condition.
+        while (b == null) {
+          b = bufferQueue.poll(5, TimeUnit.SECONDS);
+          if (!context.getExecutorState().shouldContinue()) {
+            kill(context);
+            if (b != null) {
+              assertAckSent(b);
+              if (b.getBody() != null) {
+                b.getBody().release();
+              }
+              b = null;
+            }
+          } // else b will be assigned a valid batch
+        }
       }
     } catch (final InterruptedException e) {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 5d4b3a1..50f582d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -102,14 +102,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
     }
 
     @Override
-    public RawFragmentBatch poll() throws IOException {
+    public RawFragmentBatch poll() throws IOException, InterruptedException {
       RawFragmentBatchWrapper batchWrapper = buffer.poll();
       if (batchWrapper != null) {
-        try {
-          return batchWrapper.get();
-        } catch (InterruptedException e) {
-          return null;
-        }
+        return batchWrapper.get();
       }
       return null;
     }
@@ -120,6 +116,15 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
     }
 
     @Override
+    public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
+      RawFragmentBatchWrapper batchWrapper = buffer.poll(timeout, timeUnit);
+      if (batchWrapper != null) {
+        return batchWrapper.get();
+      }
+      return null;
+    }
+
+    @Override
     public boolean checkForOutOfMemory() {
       return buffer.peek().isOutOfMemory();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bf14a74..0d36d5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -64,6 +65,15 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch
     }
 
     @Override
+    public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
+      RawFragmentBatch batch = buffer.poll(timeout, timeUnit);
+      if (batch != null) {
+        batch.sendOk();
+      }
+      return batch;
+    }
+
+    @Override
     public boolean checkForOutOfMemory() {
       return context.getAllocator().isOverLimit();
     }