You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/09/24 23:21:09 UTC
[drill] 03/04: DRILL-6746: Query can hang when PartitionSender task
thread sees a connection failure while sending data batches to remote
fragment
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit e82f5ef689f1051b6cffc17921462e8e5524629b
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Sun Sep 16 21:01:15 2018 -0700
DRILL-6746: Query can hang when PartitionSender task thread sees a connection failure while sending data batches to remote fragment
closes #1470
---
.../drill/exec/work/batch/BaseRawBatchBuffer.java | 39 ++++++++++++++++++----
.../exec/work/batch/SpoolingRawBatchBuffer.java | 17 ++++++----
.../exec/work/batch/UnlimitedRawBatchBuffer.java | 10 ++++++
3 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e..6d77d63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.work.batch;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -36,8 +37,9 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
protected interface BufferQueue<T> {
void addOomBatch(RawFragmentBatch batch);
- RawFragmentBatch poll() throws IOException;
+ RawFragmentBatch poll() throws IOException, InterruptedException;
RawFragmentBatch take() throws IOException, InterruptedException;
+ RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException;
boolean checkForOutOfMemory();
int size();
boolean isEmpty();
@@ -127,17 +129,24 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
* responses pending
*/
private void clearBufferWithBody() {
+ RawFragmentBatch batch;
while (!bufferQueue.isEmpty()) {
- final RawFragmentBatch batch;
+ batch = null;
try {
batch = bufferQueue.poll();
assertAckSent(batch);
} catch (IOException e) {
context.getExecutorState().fail(e);
continue;
- }
- if (batch.getBody() != null) {
- batch.getBody().release();
+ } catch (InterruptedException e) {
+ context.getExecutorState().fail(e);
+ // keep the state that the thread is interrupted
+ Thread.currentThread().interrupt();
+ continue;
+ } finally {
+ if (batch != null && batch.getBody() != null) {
+ batch.getBody().release();
+ }
}
}
}
@@ -167,7 +176,25 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
// if we didn't get a batch, block on waiting for queue.
if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
- b = bufferQueue.take();
+ // We shouldn't block infinitely here. There can be a condition such that due to a failure FragmentExecutor
+ // state is changed to FAILED and queue is empty. Because of this the minor fragment main thread will block
+ // here waiting for next batch to arrive. Meanwhile when next batch arrived and was enqueued it sees
+ // FragmentExecutor failure state and doesn't enqueue the batch and cleans up the buffer queue. Hence this
+ // thread will stuck forever. So we pool for 5 seconds until we get a batch or FragmentExecutor state is in
+ // error condition.
+ while (b == null) {
+ b = bufferQueue.poll(5, TimeUnit.SECONDS);
+ if (!context.getExecutorState().shouldContinue()) {
+ kill(context);
+ if (b != null) {
+ assertAckSent(b);
+ if (b.getBody() != null) {
+ b.getBody().release();
+ }
+ b = null;
+ }
+ } // else b will be assigned a valid batch
+ }
}
} catch (final InterruptedException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 5d4b3a1..50f582d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -102,14 +102,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
}
@Override
- public RawFragmentBatch poll() throws IOException {
+ public RawFragmentBatch poll() throws IOException, InterruptedException {
RawFragmentBatchWrapper batchWrapper = buffer.poll();
if (batchWrapper != null) {
- try {
- return batchWrapper.get();
- } catch (InterruptedException e) {
- return null;
- }
+ return batchWrapper.get();
}
return null;
}
@@ -120,6 +116,15 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
}
@Override
+ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
+ RawFragmentBatchWrapper batchWrapper = buffer.poll(timeout, timeUnit);
+ if (batchWrapper != null) {
+ return batchWrapper.get();
+ }
+ return null;
+ }
+
+ @Override
public boolean checkForOutOfMemory() {
return buffer.peek().isOutOfMemory();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bf14a74..0d36d5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -64,6 +65,15 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch
}
@Override
+ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
+ RawFragmentBatch batch = buffer.poll(timeout, timeUnit);
+ if (batch != null) {
+ batch.sendOk();
+ }
+ return batch;
+ }
+
+ @Override
public boolean checkForOutOfMemory() {
return context.getAllocator().isOverLimit();
}