You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/03/30 14:41:20 UTC
[7/7] drill git commit: DRILL-6125: Fix possible memory leak when
query is cancelled or finished.
DRILL-6125: Fix possible memory leak when query is cancelled or finished.
close apache/drill#1105
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a264e7fe
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a264e7fe
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a264e7fe
Branch: refs/heads/master
Commit: a264e7feb1d02ffd5762bb1f652ea22d17aa5243
Parents: 03b245e
Author: Timothy Farkas <ti...@apache.org>
Authored: Tue Jan 30 15:55:41 2018 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu Mar 29 23:24:09 2018 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/RootExec.java | 23 ++-
.../PartitionSenderRootExec.java | 32 ++--
.../exec/work/fragment/FragmentExecutor.java | 179 ++++++++++++++-----
3 files changed, 160 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 5e366fb..ddeb3e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -20,19 +20,28 @@ package org.apache.drill.exec.physical.impl;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
/**
- * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- * output nodes and storage nodes. They are there driving force behind the completion of a query.
+ * <h2>Functionality</h2>
+ * <p>
+ * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
+ * output nodes and storage nodes. They are there driving force behind the completion of a query.
+ * </p>
+ * <h2>Assumptions</h2>
+ * <p>
+ * All implementations of {@link RootExec} assume that all their methods are called by the same thread.
+ * </p>
*/
public interface RootExec extends AutoCloseable {
/**
* Do the next batch of work.
- * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
+ * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
*/
- public boolean next();
+ boolean next();
/**
- * Inform sender that receiving fragment is finished and doesn't need any more data
- * @param handle
+ * Inform sender that receiving fragment is finished and doesn't need any more data. This can be called multiple
+ * times (once for each downstream receiver). If all receivers are finished then a subsequent call to {@link #next()}
+ * will return false.
+ * @param handle The handle pointing to the downstream receiver that does not need anymore data.
*/
- public void receivingFragmentFinished(FragmentHandle handle);
+ void receivingFragmentFinished(FragmentHandle handle);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 25be50a..7e76238 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -65,14 +65,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
private PartitionerDecorator partitioner;
private ExchangeFragmentContext context;
- private boolean ok = true;
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final double cost;
private final AtomicIntegerArray remainingReceivers;
private final AtomicInteger remaingReceiverCount;
- private volatile boolean done = false;
+ private boolean done = false;
private boolean first = true;
private boolean closeIncoming;
@@ -146,11 +145,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
@Override
public boolean innerNext() {
- if (!ok) {
- return false;
- }
-
IterOutcome out;
+
if (!done) {
out = next(incoming);
} else {
@@ -252,13 +248,11 @@ public class PartitionSenderRootExec extends BaseRootExec {
startIndex, endIndex);
}
- synchronized (this) {
- partitioner = new PartitionerDecorator(subPartitioners, stats, context);
- for (int index = 0; index < terminations.size(); index++) {
- partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
- }
- terminations.clear();
+ partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+ for (int index = 0; index < terminations.size(); index++) {
+ partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
}
+ terminations.clear();
success = true;
} finally {
@@ -328,12 +322,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
public void receivingFragmentFinished(FragmentHandle handle) {
final int id = handle.getMinorFragmentId();
if (remainingReceivers.compareAndSet(id, 0, 1)) {
- synchronized (this) {
- if (partitioner == null) {
- terminations.add(id);
- } else {
- partitioner.getOutgoingBatches(id).terminate();
- }
+ if (partitioner == null) {
+ terminations.add(id);
+ } else {
+ partitioner.getOutgoingBatches(id).terminate();
}
int remaining = remaingReceiverCount.decrementAndGet();
@@ -347,7 +339,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
public void close() throws Exception {
logger.debug("Partition sender stopping.");
super.close();
- ok = false;
+
if (partitioner != null) {
updateAggregateStats();
partitioner.clear();
@@ -358,7 +350,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
- public void sendEmptyBatch(boolean isLast) {
+ private void sendEmptyBatch(boolean isLast) {
BatchSchema schema = incoming.getSchema();
if (schema == null) {
// If the incoming batch has no schema (possible when there are no input records),
http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 4f43dc1..efdb96a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.work.fragment;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -31,7 +33,6 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
@@ -49,14 +50,58 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.hadoop.security.UserGroupInformation;
/**
- * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
- * and cancellation messages.
+ * <h2>Overview</h2>
+ * <p>
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages.
+ * </p>
+ * <h2>Theory of Operation</h2>
+ * <p>
+ * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running
+ * it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There
+ * are two types of termination messages:
+ * <ol>
+ * <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li>
+ * <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests
+ * (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages
+ * for all downstream receivers.</li>
+ * </ol>
+ * </p>
+ * <p>
+ * The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when
+ * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
+ * called. The way in which these signals are handled is the following:
+ * </p>
+ * <h3>Cancellation Request</h3>
+ * <p>
+ * There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called.
+ * <ol>
+ * <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment
+ * and never start a {@link RootExec}</li>
+ * <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the
+ * {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this
+ * fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li>
+ * </ol>
+ * </p>
+ * <h3>Receiver Finished</h3>
+ * <p>
+ * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we
+ * did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls
+ * {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
+ * </p>
+ * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2>
+ * <p>
+ * There are still a few aspects of the {@link FragmentExecutor} design that are not clear.
+ * <ol>
+ * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li>
+ * </ol>
+ * </p>
*/
public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
- private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
private final String fragmentName;
private final ExecutorFragmentContext fragmentContext;
private final FragmentStatusReporter statusReporter;
@@ -66,6 +111,11 @@ public class FragmentExecutor implements Runnable {
private volatile RootExec root;
private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+ /**
+ * Holds all of the messages sent by downstream receivers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
+ * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
+ */
+ private final Queue<FragmentHandle> receiverFinishedQueue = new ConcurrentLinkedQueue<>();
private final FragmentEventProcessor eventProcessor = new FragmentEventProcessor();
// Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
@@ -135,12 +185,16 @@ public class FragmentExecutor implements Runnable {
}
/**
+ * <p>
* Cancel the execution of this fragment is in an appropriate state. Messages come from external.
- * NOTE that this will be called from threads *other* than the one running this runnable(),
+ * </p>
+ * <p>
+ * <b>Note:</b> This will be called from threads <b>Other</b> than the one running this runnable(),
* so we need to be careful about the state transitions that can result.
+ * </p>
*/
public void cancel() {
- final boolean thisIsOnlyThread = hasCloseoutThread.compareAndSet(false, true);
+ final boolean thisIsOnlyThread = myThreadRef.compareAndSet(null, Thread.currentThread());
if (thisIsOnlyThread) {
eventProcessor.cancelAndFinish();
@@ -182,13 +236,12 @@ public class FragmentExecutor implements Runnable {
@SuppressWarnings("resource")
@Override
public void run() {
- // if a cancel thread has already entered this executor, we have not reason to continue.
- if (!hasCloseoutThread.compareAndSet(false, true)) {
+ final Thread myThread = Thread.currentThread();
+
+ if (!myThreadRef.compareAndSet(null, myThread)) {
return;
}
- final Thread myThread = Thread.currentThread();
- myThreadRef.set(myThread);
final String originalThreadName = myThread.getName();
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
final ClusterCoordinator clusterCoordinator = fragmentContext.getClusterCoordinator();
@@ -203,10 +256,10 @@ public class FragmentExecutor implements Runnable {
final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
fragmentContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson());
- root = ImplCreator.getExec(fragmentContext, rootOperator);
- if (root == null) {
- return;
- }
+ root = ImplCreator.getExec(fragmentContext, rootOperator);
+ if (root == null) {
+ return;
+ }
clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
updateState(FragmentState.RUNNING);
@@ -227,11 +280,19 @@ public class FragmentExecutor implements Runnable {
@Override
public Void run() throws Exception {
injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
- /*
- * Run the query until root.next returns false OR we no longer need to continue.
- */
- while (shouldContinue() && root.next()) {
- // loop
+
+ while (shouldContinue()) {
+ // Fragment is not cancelled
+
+ for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
+ // See if we have any finished requests. If so execute them.
+ root.receivingFragmentFinished(fragmentHandle);
+ }
+
+ if (!root.next()) {
+ // Fragment has processed all of its data
+ break;
+ }
}
return null;
@@ -245,19 +306,17 @@ public class FragmentExecutor implements Runnable {
// we have a heap out of memory error. The JVM in unstable, exit.
CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
}
+ } catch (InterruptedException e) {
+ // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
+ logger.trace("Interruped root: {}", e);
} catch (Throwable t) {
fail(t);
} finally {
- // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
- // interruption after we have moved beyond this block.
- synchronized (myThreadRef) {
- myThreadRef.set(null);
- Thread.interrupted();
- }
-
- // Make sure the event processor is started at least once
- eventProcessor.start();
+ // Don't process any more termination requests, we are done.
+ eventProcessor.terminate();
+ // Clear the interrupt flag if it is set.
+ Thread.interrupted();
// here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
// FAILED state will be because of any Exception in execution loop root.next()
@@ -475,6 +534,7 @@ public class FragmentExecutor implements Runnable {
* This is especially important as fragments can take longer to start
*/
private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
+ private AtomicBoolean terminate = new AtomicBoolean(false);
void cancel() {
sendEvent(new FragmentEvent(EventType.CANCEL, null));
@@ -488,47 +548,72 @@ public class FragmentExecutor implements Runnable {
sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
}
+ /**
+ * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
+ * from being processed after the root has finished running and interrupts in the root thread have been cleared.
+ */
+ public void terminate() {
+ terminate.set(true);
+ }
+
@Override
protected void processEvent(FragmentEvent event) {
+ if (event.type.equals(EventType.RECEIVER_FINISHED)) {
+ // Finish request
+ if (terminate.get()) {
+ // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
+ return;
+ }
+ } else {
+ // Cancel request
+ if (!terminate.compareAndSet(false, true)) {
+ // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
+ // This prevents the root thread from being interrupted at an inappropriate time.
+ return;
+ }
+ }
+
switch (event.type) {
case CANCEL:
- /*
- * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
- */
+ // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
updateState(FragmentState.CANCELLATION_REQUESTED);
-
- /*
- * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
- * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
- * procedure of the main thread.
- */
- synchronized (myThreadRef) {
- final Thread myThread = myThreadRef.get();
- if (myThread != null) {
- logger.debug("Interrupting fragment thread {}", myThread.getName());
- myThread.interrupt();
- }
- }
+ // The root was started so we have to interrupt it in case it is performing a blocking operation.
+ killThread();
break;
-
case CANCEL_AND_FINISH:
+ // In this case the root was never started so we do not have to interrupt the thread.
updateState(FragmentState.CANCELLATION_REQUESTED);
+ // The FragmentExecutor#run() loop will not execute in this case so we have to cleanup resources here
cleanup(FragmentState.FINISHED);
break;
-
case RECEIVER_FINISHED:
assert event.handle != null : "RECEIVER_FINISHED event must have a handle";
if (root != null) {
logger.info("Applying request for early sender termination for {} -> {}.",
QueryIdHelper.getQueryIdentifier(getContext().getHandle()),
QueryIdHelper.getFragmentId(event.handle));
- root.receivingFragmentFinished(event.handle);
+
+ receiverFinishedQueue.add(event.handle);
} else {
logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(event.handle));
}
+ // Note we do not terminate the event processor in this case since we can recieve multiple RECEIVER_FINISHED
+ // events. One for each downstream receiver.
break;
}
}
+
+ /*
+ * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+ * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+ * procedure of the main thread.
+ */
+ private void killThread() {
+ // myThreadRef must contain a non-null reference at this point
+ final Thread myThread = myThreadRef.get();
+ logger.debug("Interrupting fragment thread {}", myThread.getName());
+ myThread.interrupt();
+ }
}
}