You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/03/30 14:41:20 UTC

[7/7] drill git commit: DRILL-6125: Fix possible memory leak when query is cancelled or finished.

DRILL-6125: Fix possible memory leak when query is cancelled or finished.

close apache/drill#1105


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a264e7fe
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a264e7fe
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a264e7fe

Branch: refs/heads/master
Commit: a264e7feb1d02ffd5762bb1f652ea22d17aa5243
Parents: 03b245e
Author: Timothy Farkas <ti...@apache.org>
Authored: Tue Jan 30 15:55:41 2018 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu Mar 29 23:24:09 2018 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/RootExec.java      |  23 ++-
 .../PartitionSenderRootExec.java                |  32 ++--
 .../exec/work/fragment/FragmentExecutor.java    | 179 ++++++++++++++-----
 3 files changed, 160 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 5e366fb..ddeb3e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -20,19 +20,28 @@ package org.apache.drill.exec.physical.impl;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 /**
- * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- * output nodes and storage nodes.  They are there driving force behind the completion of a query.
+ * <h2>Functionality</h2>
+ * <p>
+ *   A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
+ *   output nodes and storage nodes.  They are there driving force behind the completion of a query.
+ * </p>
+ * <h2>Assumptions</h2>
+ * <p>
+ *   All implementations of {@link RootExec} assume that all their methods are called by the same thread.
+ * </p>
  */
 public interface RootExec extends AutoCloseable {
   /**
    * Do the next batch of work.
-   * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
+   * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
    */
-  public boolean next();
+  boolean next();
 
   /**
-   * Inform sender that receiving fragment is finished and doesn't need any more data
-   * @param handle
+   * Inform sender that receiving fragment is finished and doesn't need any more data. This can be called multiple
+   * times (once for each downstream receiver). If all receivers are finished then a subsequent call to {@link #next()}
+   * will return false.
+   * @param handle The handle pointing to the downstream receiver that does not need anymore data.
    */
-  public void receivingFragmentFinished(FragmentHandle handle);
+  void receivingFragmentFinished(FragmentHandle handle);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 25be50a..7e76238 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -65,14 +65,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private PartitionerDecorator partitioner;
 
   private ExchangeFragmentContext context;
-  private boolean ok = true;
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final double cost;
 
   private final AtomicIntegerArray remainingReceivers;
   private final AtomicInteger remaingReceiverCount;
-  private volatile boolean done = false;
+  private boolean done = false;
   private boolean first = true;
   private boolean closeIncoming;
 
@@ -146,11 +145,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   @Override
   public boolean innerNext() {
-    if (!ok) {
-      return false;
-    }
-
     IterOutcome out;
+
     if (!done) {
       out = next(incoming);
     } else {
@@ -252,13 +248,11 @@ public class PartitionSenderRootExec extends BaseRootExec {
             startIndex, endIndex);
       }
 
-      synchronized (this) {
-        partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-        for (int index = 0; index < terminations.size(); index++) {
-          partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
-        }
-        terminations.clear();
+      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+      for (int index = 0; index < terminations.size(); index++) {
+        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
       }
+      terminations.clear();
 
       success = true;
     } finally {
@@ -328,12 +322,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public void receivingFragmentFinished(FragmentHandle handle) {
     final int id = handle.getMinorFragmentId();
     if (remainingReceivers.compareAndSet(id, 0, 1)) {
-      synchronized (this) {
-        if (partitioner == null) {
-          terminations.add(id);
-        } else {
-          partitioner.getOutgoingBatches(id).terminate();
-        }
+      if (partitioner == null) {
+        terminations.add(id);
+      } else {
+        partitioner.getOutgoingBatches(id).terminate();
       }
 
       int remaining = remaingReceiverCount.decrementAndGet();
@@ -347,7 +339,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public void close() throws Exception {
     logger.debug("Partition sender stopping.");
     super.close();
-    ok = false;
+
     if (partitioner != null) {
       updateAggregateStats();
       partitioner.clear();
@@ -358,7 +350,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  public void sendEmptyBatch(boolean isLast) {
+  private void sendEmptyBatch(boolean isLast) {
     BatchSchema schema = incoming.getSchema();
     if (schema == null) {
       // If the incoming batch has no schema (possible when there are no input records),

http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 4f43dc1..efdb96a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.work.fragment;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -31,7 +33,6 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -49,14 +50,58 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
- * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
- * and cancellation messages.
+ * <h2>Overview</h2>
+ * <p>
+ *   Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages.
+ * </p>
+ * <h2>Theory of Operation</h2>
+ * <p>
+ *  The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running
+ *  it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There
+ *  are two types of termination messages:
+ *  <ol>
+ *    <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li>
+ *    <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests
+ *    (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages
+ *    for all downstream receivers.</li>
+ *  </ol>
+ * </p>
+ * <p>
+ *   The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when
+ *   {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
+ *   called. The way in which these signals are handled is the following:
+ * </p>
+ * <h3>Cancellation Request</h3>
+ * <p>
+ *   There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called.
+ *   <ol>
+ *     <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment
+ *     and never start a {@link RootExec}</li>
+ *     <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the
+ *     {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this
+ *     fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li>
+ *   </ol>
+ * </p>
+ * <h3>Receiver Finished</h3>
+ * <p>
+ *  When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we
+ *  did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls
+ *  {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
+ * </p>
+ * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2>
+ * <p>
+ *   There are still a few aspects of the {@link FragmentExecutor} design that are not clear.
+ *   <ol>
+ *     <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li>
+ *     <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li>
+ *     <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li>
+ *   </ol>
+ * </p>
  */
 public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
 
-  private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
   private final String fragmentName;
   private final ExecutorFragmentContext fragmentContext;
   private final FragmentStatusReporter statusReporter;
@@ -66,6 +111,11 @@ public class FragmentExecutor implements Runnable {
 
   private volatile RootExec root;
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+  /**
+   * Holds all of the messages sent by downstream receivers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
+   * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
+   */
+  private final Queue<FragmentHandle> receiverFinishedQueue = new ConcurrentLinkedQueue<>();
   private final FragmentEventProcessor eventProcessor = new FragmentEventProcessor();
 
   // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
@@ -135,12 +185,16 @@ public class FragmentExecutor implements Runnable {
   }
 
   /**
+   * <p>
    * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
-   * NOTE that this will be called from threads *other* than the one running this runnable(),
+   * </p>
+   * <p>
+   * <b>Note:</b> This will be called from threads <b>Other</b> than the one running this runnable(),
    * so we need to be careful about the state transitions that can result.
+   * </p>
    */
   public void cancel() {
-    final boolean thisIsOnlyThread = hasCloseoutThread.compareAndSet(false, true);
+    final boolean thisIsOnlyThread = myThreadRef.compareAndSet(null, Thread.currentThread());
 
     if (thisIsOnlyThread) {
       eventProcessor.cancelAndFinish();
@@ -182,13 +236,12 @@ public class FragmentExecutor implements Runnable {
   @SuppressWarnings("resource")
   @Override
   public void run() {
-    // if a cancel thread has already entered this executor, we have not reason to continue.
-    if (!hasCloseoutThread.compareAndSet(false, true)) {
+    final Thread myThread = Thread.currentThread();
+
+    if (!myThreadRef.compareAndSet(null, myThread)) {
       return;
     }
 
-    final Thread myThread = Thread.currentThread();
-    myThreadRef.set(myThread);
     final String originalThreadName = myThread.getName();
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
     final ClusterCoordinator clusterCoordinator = fragmentContext.getClusterCoordinator();
@@ -203,10 +256,10 @@ public class FragmentExecutor implements Runnable {
       final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
           fragmentContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson());
 
-          root = ImplCreator.getExec(fragmentContext, rootOperator);
-          if (root == null) {
-            return;
-          }
+      root = ImplCreator.getExec(fragmentContext, rootOperator);
+      if (root == null) {
+        return;
+      }
 
       clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
       updateState(FragmentState.RUNNING);
@@ -227,11 +280,19 @@ public class FragmentExecutor implements Runnable {
         @Override
         public Void run() throws Exception {
           injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
-          /*
-           * Run the query until root.next returns false OR we no longer need to continue.
-           */
-          while (shouldContinue() && root.next()) {
-            // loop
+
+          while (shouldContinue()) {
+            // Fragment is not cancelled
+
+            for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
+              // See if we have any finished requests. If so execute them.
+              root.receivingFragmentFinished(fragmentHandle);
+            }
+
+            if (!root.next()) {
+              // Fragment has processed all of its data
+              break;
+            }
           }
 
           return null;
@@ -245,19 +306,17 @@ public class FragmentExecutor implements Runnable {
         // we have a heap out of memory error. The JVM in unstable, exit.
         CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
       }
+    } catch (InterruptedException e) {
+      // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
+      logger.trace("Interruped root: {}", e);
     } catch (Throwable t) {
       fail(t);
     } finally {
 
-      // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
-      // interruption after we have moved beyond this block.
-      synchronized (myThreadRef) {
-        myThreadRef.set(null);
-        Thread.interrupted();
-      }
-
-      // Make sure the event processor is started at least once
-      eventProcessor.start();
+      // Don't process any more termination requests, we are done.
+      eventProcessor.terminate();
+      // Clear the interrupt flag if it is set.
+      Thread.interrupted();
 
       // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
       // FAILED state will be because of any Exception in execution loop root.next()
@@ -475,6 +534,7 @@ public class FragmentExecutor implements Runnable {
    * This is especially important as fragments can take longer to start
    */
   private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
+    private AtomicBoolean terminate = new AtomicBoolean(false);
 
     void cancel() {
       sendEvent(new FragmentEvent(EventType.CANCEL, null));
@@ -488,47 +548,72 @@ public class FragmentExecutor implements Runnable {
       sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
     }
 
+    /**
+     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
+     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
+     */
+    public void terminate() {
+      terminate.set(true);
+    }
+
     @Override
     protected void processEvent(FragmentEvent event) {
+      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
+        // Finish request
+        if (terminate.get()) {
+          // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
+          return;
+        }
+      } else {
+        // Cancel request
+        if (!terminate.compareAndSet(false, true)) {
+          // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
+          // This prevents the root thread from being interrupted at an inappropriate time.
+          return;
+        }
+      }
+
       switch (event.type) {
         case CANCEL:
-          /*
-           * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
-           */
+          // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
           updateState(FragmentState.CANCELLATION_REQUESTED);
-
-          /*
-           * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
-           * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
-           * procedure of the main thread.
-          */
-          synchronized (myThreadRef) {
-            final Thread myThread = myThreadRef.get();
-            if (myThread != null) {
-              logger.debug("Interrupting fragment thread {}", myThread.getName());
-              myThread.interrupt();
-            }
-          }
+          // The root was started so we have to interrupt it in case it is performing a blocking operation.
+          killThread();
           break;
-
         case CANCEL_AND_FINISH:
+          // In this case the root was never started so we do not have to interrupt the thread.
           updateState(FragmentState.CANCELLATION_REQUESTED);
+          // The FragmentExecutor#run() loop will not execute in this case so we have to cleanup resources here
           cleanup(FragmentState.FINISHED);
           break;
-
         case RECEIVER_FINISHED:
           assert event.handle != null : "RECEIVER_FINISHED event must have a handle";
           if (root != null) {
             logger.info("Applying request for early sender termination for {} -> {}.",
               QueryIdHelper.getQueryIdentifier(getContext().getHandle()),
               QueryIdHelper.getFragmentId(event.handle));
-            root.receivingFragmentFinished(event.handle);
+
+            receiverFinishedQueue.add(event.handle);
           } else {
             logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
               QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(event.handle));
           }
+          // Note we do not terminate the event processor in this case since we can recieve multiple RECEIVER_FINISHED
+          // events. One for each downstream receiver.
           break;
       }
     }
+
+    /*
+     * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+     * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+     * procedure of the main thread.
+    */
+    private void killThread() {
+      // myThreadRef must contain a non-null reference at this point
+      final Thread myThread = myThreadRef.get();
+      logger.debug("Interrupting fragment thread {}", myThread.getName());
+      myThread.interrupt();
+    }
   }
 }