You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ilooner <gi...@git.apache.org> on 2018/01/31 18:58:32 UTC

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

GitHub user ilooner opened a pull request:

    https://github.com/apache/drill/pull/1105

    DRILL-6125: Fix possible memory leak when query is cancelled.

    A detailed description of the problem and solution can be found here: 
    
    https://issues.apache.org/jira/browse/DRILL-6125

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ilooner/drill DRILL-6125

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1105
    
----
commit 1d1725a276c058e8c09e456963bac928d1f062ed
Author: Timothy Farkas <ti...@...>
Date:   2018-01-30T23:55:41Z

    DRILL-6125: Fix possible memory leak when query is cancelled.

----


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173300352
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -67,6 +88,11 @@
     
       private volatile RootExec root;
       private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
    +  /**
    +   * Holds all of the messages sent by downstream recievers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
    +   * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
    +   */
    +  private final Queue<FragmentHandle> recieverFinishedQueue = new ConcurrentLinkedQueue<>();
    --- End diff --
    
    please fix typo (receiver)


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @ilooner What state is protected by `syncrhonized`? Why is it not sufficient to use `volatile` and `AtomicReference`?


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173528013
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
    +        // Finish request
    +        if (terminate.get()) {
    +          // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
    +          return;
    +        }
    +      } else {
    +        // Cancel request
    +        if (!terminate.compareAndSet(false, true)) {
    +          // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
    +          // This prevents the root thread from being interrupted at an inappropriate time.
    +          return;
    +        }
    +      }
    +
           switch (event.type) {
             case CANCEL:
    -          /*
    -           * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
    -           */
    +          // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    -
    -          /*
    -           * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
    -           * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
    -           * procedure of the main thread.
    -          */
    -          synchronized (myThreadRef) {
    -            final Thread myThread = myThreadRef.get();
    -            if (myThread != null) {
    -              logger.debug("Interrupting fragment thread {}", myThread.getName());
    -              myThread.interrupt();
    -            }
    -          }
    +          // The root was started so we have to interrupt it in case it is performing a blocking operation.
    +          killThread();
    +          terminate.set(true);
               break;
    -
             case CANCEL_AND_FINISH:
    +          // In this case the root was never started so we do not have to interrupt the thread.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    +          // The FragmentExecutor#run() loop will not execute in this case so we have to cleanup resources here
               cleanup(FragmentState.FINISHED);
    +          terminate.set(true);
    --- End diff --
    
    it is not necessary (terminate should be already true).


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173300621
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -231,9 +261,9 @@ public Void run() throws Exception {
               while (shouldContinue()) {
                 // Fragment is not cancelled
     
    -            if (eventProcessor.hasFinishedRequests()) {
    +            if (!recieverFinishedQueue.isEmpty()) {
    --- End diff --
    
    I don't think `isEmpty()` is necessary, it is the same as `poll() != null`.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173224242
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -475,6 +483,8 @@ public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint>
        * This is especially important as fragments can take longer to start
        */
       private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
    +    private boolean terminate = false;
    +    private List<FragmentHandle> finishedHandles = Lists.newArrayList();
    --- End diff --
    
    Consider using Queue and possibly concurrent Queue and moving `finishedHandles` to FragmentExecutor.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1105


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r170782805
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -62,7 +62,7 @@
       private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
       private RecordBatch incoming;
       private HashPartitionSender operator;
    -  private PartitionerDecorator partitioner;
    +  private volatile PartitionerDecorator partitioner;
    --- End diff --
    
    Consider using `AtomicReference` instead of `volatile` 


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173527617
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -227,11 +280,19 @@ public void run() {
             @Override
             public Void run() throws Exception {
               injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
    -          /*
    -           * Run the query until root.next returns false OR we no longer need to continue.
    -           */
    -          while (shouldContinue() && root.next()) {
    -            // loop
    +
    +          while (shouldContinue()) {
    +            // Fragment is not cancelled
    +
    +            for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
    --- End diff --
    
    How this processing affects `root.next()`? Should it be also/only done after fragment is canceled or done (after while loop)?


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173229346
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -475,6 +483,8 @@ public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint>
        * This is especially important as fragments can take longer to start
        */
       private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
    +    private boolean terminate = false;
    --- End diff --
    
    Consider using AtomicBoolean and avoiding `synchronized`.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @arina-ielchiieva @vrozov I believe I have a solution. There were several issues with the original code.
    
    1. It made incorrect assumptions about how cache invalidation works with java **synchronized**.
    2. It assumed **innerNext** and **close** would be called sequentially.
    
    I believe this PR fixes these issues now and I have gone into more detail about the problems below.
    
    # 1. Incorrect Cache Invalidation Assumptions
    
    The original code was trying to be smart by trying to reduce synchronization overhead on **innerNext**. So the code in **innerNext** did not synchronize before changing the partitioner object since this would be called often. The code in **close()** and ** receivingFragmentFinished()** synchronized before accessing the partitioner with the intention that this would trigger an update of the partitioner variable state across all threads. Unfortunately, this assumption is invalid (see https://stackoverflow.com/questions/22706739/does-synchronized-guarantee-a-thread-will-see-the-latest-value-of-a-non-volatile). Every thread that accesses a variable must synchronize before accessing a variable in order to properly invalidate cached data on a core. 
    
    For example if **Thread A** modifies **Variable 1** then **Thread B** synchronizes before accessing **Variable 1** then there is no guarantee **Thread B** will see the most updated value for **Variable 1** since it might .
    
    ## Solution
    
    In summary the right thing to do is the simple thing. Make the methods synchronized. Unfortunately there is no way to outsmart the system and reduce synchronization overhead without causing race conditions.
    
    # 2. Concurrent InnerNext and Close Calls
    
    The original code did not consider the case that innerNext was in the middle of execution when close was called. It did try to handle the case where **innerNext** could be called after **close** by setting the **ok** variable. But it didn't even do that right because there was no synchronization around the **ok** variable.
    
    ## Solution
    
    The right thing to do is the simple thing. Make sure the methods are synchronized so close has to wait until innerNext is done before executing. Also when a query is cancelled the executing thread should be interrupted the thread running innerNext incase it is on a blocking call.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173526625
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -245,19 +306,17 @@ public Void run() throws Exception {
             // we have a heap out of memory error. The JVM in unstable, exit.
             CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
           }
    +    } catch (InterruptedException e) {
    +      // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
    +      logger.trace("Interruped root: {}", e);
         } catch (Throwable t) {
           fail(t);
         } finally {
     
    -      // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
    -      // interruption after we have moved beyond this block.
    -      synchronized (myThreadRef) {
    -        myThreadRef.set(null);
    -        Thread.interrupted();
    -      }
    -
    -      // Make sure the event processor is started at least once
    -      eventProcessor.start();
    +      // Don't process any more termination requests, we are done.
    +      eventProcessor.terminate();
    --- End diff --
    
    Is this terminate() required?


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173537455
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
    +        // Finish request
    +        if (terminate.get()) {
    +          // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
    +          return;
    +        }
    +      } else {
    +        // Cancel request
    +        if (!terminate.compareAndSet(false, true)) {
    +          // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
    +          // This prevents the root thread from being interrupted at an inappropriate time.
    +          return;
    +        }
    +      }
    +
           switch (event.type) {
             case CANCEL:
    -          /*
    -           * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
    -           */
    +          // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    -
    -          /*
    -           * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
    -           * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
    -           * procedure of the main thread.
    -          */
    -          synchronized (myThreadRef) {
    -            final Thread myThread = myThreadRef.get();
    -            if (myThread != null) {
    -              logger.debug("Interrupting fragment thread {}", myThread.getName());
    -              myThread.interrupt();
    -            }
    -          }
    +          // The root was started so we have to interrupt it in case it is performing a blocking operation.
    +          killThread();
    +          terminate.set(true);
    --- End diff --
    
    Doh! Thanks for catching


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173367348
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -67,6 +88,11 @@
     
       private volatile RootExec root;
       private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
    +  /**
    +   * Holds all of the messages sent by downstream recievers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
    +   * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
    +   */
    +  private final Queue<FragmentHandle> recieverFinishedQueue = new ConcurrentLinkedQueue<>();
    --- End diff --
    
    Thanks for catching. Fixed all the misspellings of receiver.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173543287
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -245,19 +306,17 @@ public Void run() throws Exception {
             // we have a heap out of memory error. The JVM in unstable, exit.
             CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
           }
    +    } catch (InterruptedException e) {
    +      // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
    +      logger.trace("Interruped root: {}", e);
         } catch (Throwable t) {
           fail(t);
         } finally {
     
    -      // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
    -      // interruption after we have moved beyond this block.
    -      synchronized (myThreadRef) {
    -        myThreadRef.set(null);
    -        Thread.interrupted();
    -      }
    -
    -      // Make sure the event processor is started at least once
    -      eventProcessor.start();
    +      // Don't process any more termination requests, we are done.
    +      eventProcessor.terminate();
    --- End diff --
    
    There is a corner case. If we didn't include eventProcessor.terminate() we could theoretically receive a cancellation request for the first time after the interrupts were cleared for the FragmentExecutor#run thread. The cancellation would then interrupt the Thread again, and our FragmentExecutor would finish and leave the thread it used in an interrupted state. This could cause problems for the next FragmentExecutor that uses the same thread.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172327106
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -231,7 +231,7 @@ public boolean innerNext() {
       }
     
       @VisibleForTesting
    -  protected void createPartitioner() throws SchemaChangeException {
    +  protected synchronized void createPartitioner() throws SchemaChangeException {
    --- End diff --
    
    Removed


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172024966
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -358,7 +357,7 @@ public void close() throws Exception {
         }
       }
     
    -  public void sendEmptyBatch(boolean isLast) {
    +  public synchronized void sendEmptyBatch(boolean isLast) {
    --- End diff --
    
    Method can be made private. Plus maybe not be synchronized as well since is being called from `innerNext`.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @vrozov @arina-ielchiieva Applied review comments, please let me know if there are anymore comments.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173303074
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +527,66 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public synchronized void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (terminate.get()) {
    --- End diff --
    
    use atomic compare and set to avoid race condition.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173367313
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -475,6 +483,8 @@ public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint>
        * This is especially important as fragments can take longer to start
        */
       private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
    +    private boolean terminate = false;
    --- End diff --
    
    Done


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173542042
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -227,11 +280,19 @@ public void run() {
             @Override
             public Void run() throws Exception {
               injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
    -          /*
    -           * Run the query until root.next returns false OR we no longer need to continue.
    -           */
    -          while (shouldContinue() && root.next()) {
    -            // loop
    +
    +          while (shouldContinue()) {
    +            // Fragment is not cancelled
    +
    +            for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
    --- End diff --
    
    The original code allowed PartitionSenderRootExec#receivingFragmentFinished() to be called before, after or concurrently with PartitionSenderRootExec#innerNext(). The idea was that PartitionSenderRootExec#receivingFragmentFinished would terminate a partition. The terminate method of OutgoingRecordBatch would cause all outgoing records to a finished receiver to be dropped. So the finished downstream receivers would stop receiving data from the PartitionSenderRootExec, but the downstream receivers that did not finish would continue to receive records. There is a gray area here because I'm not sure why only some of the downstream receivers would send a RECEIVER_FINISHED message and others wouldn't, but the original design seems to make an assumption that this is a very common thing and is optimized for it. So I assume the original authors know something that we don't with respect to that.
    
    Also calling receivingFragmentFinished after we finished processing all the data would defeat the purpose, since the intention was to allow our fragment to terminate early if the downstream receivers decided they don't need anymore data (ex. A select limit query which only asks the first 100 rows of a result with 10 million rows). If we called it only after the next() loop was done we would always process all upstream records even when we didn't have to and we would see significant degradation in the performance of limit queries.
    



---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r169048352
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -348,9 +348,12 @@ public void close() throws Exception {
         logger.debug("Partition sender stopping.");
         super.close();
         ok = false;
    -    if (partitioner != null) {
    -      updateAggregateStats();
    -      partitioner.clear();
    +
    +    synchronized (this) {
    --- End diff --
    
    1. Should partitioner be volatile?
    2. Should we check if partitioner is not null before synchronization as well (DCL)?


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @vrozov @arina-ielchiieva Handled multiple receiver finished messages correctly. This PR is ready for another round of review.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @arina-ielchiieva Please review.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @arina-ielchiieva Applied review comments.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173545351
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -227,11 +280,19 @@ public void run() {
             @Override
             public Void run() throws Exception {
               injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
    -          /*
    -           * Run the query until root.next returns false OR we no longer need to continue.
    -           */
    -          while (shouldContinue() && root.next()) {
    -            // loop
    +
    +          while (shouldContinue()) {
    +            // Fragment is not cancelled
    +
    +            for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
    --- End diff --
    
    OK, I see.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173367305
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -475,6 +483,8 @@ public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint>
        * This is especially important as fragments can take longer to start
        */
       private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
    +    private boolean terminate = false;
    +    private List<FragmentHandle> finishedHandles = Lists.newArrayList();
    --- End diff --
    
    Done


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172024881
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -245,6 +244,8 @@ public Void run() throws Exception {
             // we have a heap out of memory error. The JVM in unstable, exit.
             CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
           }
    +    } catch (InterruptedException e) {
    +      // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
    --- End diff --
    
    Maybe add logger.trace?


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @vrozov The main issue is that InnerNext and Close should not execute concurrently. Even when we are using AtomicReferences and volatile the following sequence of events could happen which could cause a memory leak:
    
      1. Let's say there are two Threads. The **Close thread** which starts at the beginning of the close method, and the **Next thread** which starts at the beginning of the innerNext method.
      1. Now let's say the **Next Thread** runs and checks **ok**. Since close has not been called yet **ok** is true.
      1. Now the **Next Thread** is after the ok check.
      1. The **Close thread** now starta executing. And the close thread clears the partitioner.
      1. Now after the partitioner is cleared the **Next Thread** can resume executing. If the next thread receives an OK_SCHEMA he will allocate a new partitioner. Since the OK_SCHEMA message may include records the partitioner may partition some data as well.
      1. Now the **Close thread** is done, but there is a partitioner that has not been closed, and we will leak memory.
    
    In order to property solve this problem we need to make sure that the innerNext and close methods are mutually exclusive so the above scenario can never happen. The easiest way to do that is to use the synchronized key word. If we use the synchronized keyword then we don't have to use volatile or atomic reference.
    
    Also as a side note using synchronized will probably be more efficient since a cache flush would only be triggered at the start of the innerNext and close method. Alternatively if we used volatile and AtomicReference a cache flush would be triggered every time we accessed the ok and partitioner variables.



---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172327066
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -245,6 +244,8 @@ public Void run() throws Exception {
             // we have a heap out of memory error. The JVM in unstable, exit.
             CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2);
           }
    +    } catch (InterruptedException e) {
    +      // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
    --- End diff --
    
    Done


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    Squashed commits. @arina-ielchiieva please let me know if you have any comments.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173527951
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
    +        // Finish request
    +        if (terminate.get()) {
    +          // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
    +          return;
    +        }
    +      } else {
    +        // Cancel request
    +        if (!terminate.compareAndSet(false, true)) {
    +          // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
    +          // This prevents the root thread from being interrupted at an inappropriate time.
    +          return;
    +        }
    +      }
    +
           switch (event.type) {
             case CANCEL:
    -          /*
    -           * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
    -           */
    +          // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    -
    -          /*
    -           * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
    -           * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
    -           * procedure of the main thread.
    -          */
    -          synchronized (myThreadRef) {
    -            final Thread myThread = myThreadRef.get();
    -            if (myThread != null) {
    -              logger.debug("Interrupting fragment thread {}", myThread.getName());
    -              myThread.interrupt();
    -            }
    -          }
    +          // The root was started so we have to interrupt it in case it is performing a blocking operation.
    +          killThread();
    +          terminate.set(true);
    --- End diff --
    
    it is not necessary (terminate should be already true).


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @priteshm @arina-ielchiieva I should have updated this PR earlier this week, here is my update. After reflecting on Arina's comments and reading some more docs about how java implements volatile and synchronization, I think this solution might not fix the original race condition. I need to to more reading to get a better understanding. Additionally I realized there is another race condition where two threads are simultaneously calling close and innerNext which could cause a memory leak. Haven't had a chance to dig further this week, so I will try to wrap this up next week.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173367485
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -231,9 +261,9 @@ public Void run() throws Exception {
               while (shouldContinue()) {
                 // Fragment is not cancelled
     
    -            if (eventProcessor.hasFinishedRequests()) {
    +            if (!recieverFinishedQueue.isEmpty()) {
    --- End diff --
    
    Thanks for catching removed.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @arina-ielchiieva @vrozov In light of Vlad's comments I have reworked the synchronization model yet again. This change now removes all synchronization from PartitionSenderRootExec and enforces the guarantee that all the lifecycle methods of the PartitionSenderRootExec will only be called by a single Run thread in the FragmentExecutor. Also while making this change I discovered a few other bugs with how cancellations and receiver finishes are handled, so I have addressed those bugs as well. I will go into more detail about what I changed below.
    
    # Motivation
    
    As Vlad pointed out **close** and **innerNext** are never called concurrently. After closer inspection of the code I also released that currently (in apache master) innerNext and close will always be called by the **FragmentExecutor#run** thread. The only method of PartitionSenderRootExec that is not called by the **FragmentExecutor#run** thread is **receivingFragmentFinished**. In order to simplify the implementation of PartitionSenderRootExec and also simplify the design of the FragmentExecutor I changed the code so that only the **FragmentExecutor#run** thread calls **receivingFragmentFinished** as well. In this way we can remove all the synchronization from PartitionSenderRootExec. This was done by by:
     1. Making the event processor save the FragmentHandle in the event that a receiver finish request was sent. 
     2. After the **root.next()** loop terminates in **FragmentExecutor#run** the eventProcessor is checked to see if a finish request was received. If so **receivingFragmentFinished** is called on root by the **FragmentExecutor#run** method.
    
    # Other Bugs
    
    ## Processing of multiple termination requests
    
    The event processor would process all cancellation and finish requests, even if there is more than one. This doesn't really make sense, since we can only cancel or finish once. So I have changed the event processor to execute only the first termination request and ignore all the others.
    
    ## Simultaneous Cancellation and Finishing
    
    Since the event processor would process multiple termination requests concurrently it was possible for a cancel and a finish message to be received and processed simultaneously. The results of this were not well defined since **close** and **receivingFragmentFinished** could be called concurrently.
    
    # Other Improvements
    
    Vlad also pointed out that we did not need the hasCloseoutThread atomic reference, since we were already using the myThreadRef atomic reference. That cleaned up the code a bit.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173367462
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +527,66 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public synchronized void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (terminate.get()) {
    --- End diff --
    
    Thanks for catching. Fixed.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @sachouche I traced through the code. The updateAggregateStats method is called, which then calls the getOutgoingBatches method of the code generated Partitioners. That method is just a simple getter. So no one is acquiring the same lock. But even if someone else was, the code in the close method is single threaded, and synchronize blocks are reentrant. 


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    +1


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    Looks good, @vrozov are you ok with the changes?


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is finish...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    Nope never mind I think I spoke too soon. I just realized we may get multiple receivingFragmentFinished requests, one for each downstream receiver. Back to the drawing board.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @ilooner @arina-ielchiieva How `innerNext()` and `close()` can execute concurrently? Does not `FragmentExecutor.hasCloseoutThread` ensure that either `close()` is called on the `run()` thread or `run()` exits if the fragment is already cancelled?


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173537487
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---
    @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and interrupts in the root thread have been cleared.
    +     */
    +    public void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
    +        // Finish request
    +        if (terminate.get()) {
    +          // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests.
    +          return;
    +        }
    +      } else {
    +        // Cancel request
    +        if (!terminate.compareAndSet(false, true)) {
    +          // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests.
    +          // This prevents the root thread from being interrupted at an inappropriate time.
    +          return;
    +        }
    +      }
    +
           switch (event.type) {
             case CANCEL:
    -          /*
    -           * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
    -           */
    +          // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    -
    -          /*
    -           * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
    -           * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
    -           * procedure of the main thread.
    -          */
    -          synchronized (myThreadRef) {
    -            final Thread myThread = myThreadRef.get();
    -            if (myThread != null) {
    -              logger.debug("Interrupting fragment thread {}", myThread.getName());
    -              myThread.interrupt();
    -            }
    -          }
    +          // The root was started so we have to interrupt it in case it is performing a blocking operation.
    +          killThread();
    +          terminate.set(true);
               break;
    -
             case CANCEL_AND_FINISH:
    +          // In this case the root was never started so we do not have to interrupt the thread.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    +          // The FragmentExecutor#run() loop will not execute in this case so we have to cleanup resources here
               cleanup(FragmentState.FINISHED);
    +          terminate.set(true);
    --- End diff --
    
    Thanks for catching


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @vrozov Thanks for catching this, I believe you are right. hasClouseoutThread guarantees innerNext and close won't be called concurrently. However, I still believe innerNext and receivingFragmentFinished could be called concurrently, since the ControlMessageHandler thread executes recievingFragmentFinished. Additionally in rare cases where a limit query is cancelled recievingFragmentFinished and close could be called concurrently as well. While reflecting on your comments I also saw another issue where the root could be blocked on a next call but a Finished event would not cause root to terminate.
    
    In light of all of this I actually think the **synchronized** is not sufficient. We will have to have some way to interrupt the execution of the root when we received a finish signal and only close out the resources after receivingFragmentFinished has been called. Similarly if we receive a finish signal we should ignore any cancellation requests instead of trying to cancel and finish simultaneously and vice versa.
    
    I will rework the solution to address these issues and let you know when I have an update.


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172327159
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -358,7 +357,7 @@ public void close() throws Exception {
         }
       }
     
    -  public void sendEmptyBatch(boolean isLast) {
    +  public synchronized void sendEmptyBatch(boolean isLast) {
    --- End diff --
    
    Done


---

[GitHub] drill pull request #1105: DRILL-6125: Fix possible memory leak when query is...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r172024863
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---
    @@ -231,7 +231,7 @@ public boolean innerNext() {
       }
     
       @VisibleForTesting
    -  protected void createPartitioner() throws SchemaChangeException {
    +  protected synchronized void createPartitioner() throws SchemaChangeException {
    --- End diff --
    
    Why you need to synchronized`createParitioner`? It's being called from `innerNext` which is already synchronized.


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @sachouche @arina-ielchiieva 


---

[GitHub] drill issue #1105: DRILL-6125: Fix possible memory leak when query is cancel...

Posted by priteshm <gi...@git.apache.org>.
Github user priteshm commented on the issue:

    https://github.com/apache/drill/pull/1105
  
    @arina-ielchiieva is this bug ready to commit?


---