You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vrozov <gi...@git.apache.org> on 2018/04/12 16:35:12 UTC

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

GitHub user vrozov opened a pull request:

    https://github.com/apache/drill/pull/1208

    DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation

    @ilooner Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vrozov/drill DRILL-6295

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1208
    
----
commit 20c917461536d14bc752c3085fad8799a107f6cc
Author: Vlad Rozov <vr...@...>
Date:   2018-04-11T17:12:07Z

    DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation

----


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181927070
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (runner.compareAndSet(null, thread)) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
             localStats.clear();
             localStats.startProcessing();
    -        iface.execute(part);
    -      } catch (IOException e) {
    -        exp = e;
    -      } finally {
    -        localStats.stopProcessing();
    -        currThread.setName(currThreadName);
    -        latch.countDown();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          if (state.get() == STATE.NEW) {
    +            iface.execute(partitioner);
    +          }
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        } finally {
    +          if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +            if (executionException == null) {
    +              localStats.stopProcessing();
    +              state.lazySet(STATE.NORMAL);
    +            } else {
    +              exception = executionException;
    +              state.lazySet(STATE.EXCEPTIONAL);
    +            }
    +          }
    +          if (count.decrementAndGet() == 0) {
    +            LockSupport.unpark(partitionerDecorator.thread);
    +          }
    +          thread.setName(name);
    +          while (state.get() == STATE.INTERRUPTING) {
    +            Thread.yield();
    +          }
    +          // Clear interrupt flag
    +          Thread.interrupted();
    +        }
    +      }
    +    }
    +
    +    void cancel(boolean mayInterruptIfRunning) {
    +      Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread,
    +          String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName()));
    +      if (runner.compareAndSet(null, partitionerDecorator.thread)) {
    +        if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
    +          ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
    +        }
    +        count.decrementAndGet();
    --- End diff --
    
    Agree, it is necessary to re-check 'count` after canceling tasks.


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181915654
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (runner.compareAndSet(null, thread)) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
             localStats.clear();
             localStats.startProcessing();
    -        iface.execute(part);
    -      } catch (IOException e) {
    -        exp = e;
    -      } finally {
    -        localStats.stopProcessing();
    -        currThread.setName(currThreadName);
    -        latch.countDown();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          if (state.get() == STATE.NEW) {
    +            iface.execute(partitioner);
    +          }
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        } finally {
    +          if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +            if (executionException == null) {
    +              localStats.stopProcessing();
    +              state.lazySet(STATE.NORMAL);
    +            } else {
    +              exception = executionException;
    +              state.lazySet(STATE.EXCEPTIONAL);
    +            }
    +          }
    +          if (count.decrementAndGet() == 0) {
    +            LockSupport.unpark(partitionerDecorator.thread);
    +          }
    +          thread.setName(name);
    +          while (state.get() == STATE.INTERRUPTING) {
    +            Thread.yield();
    +          }
    +          // Clear interrupt flag
    +          Thread.interrupted();
    +        }
    +      }
    +    }
    +
    +    void cancel(boolean mayInterruptIfRunning) {
    +      Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread,
    +          String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName()));
    +      if (runner.compareAndSet(null, partitionerDecorator.thread)) {
    +        if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
    +          ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
    +        }
    +        count.decrementAndGet();
    --- End diff --
    
    The main PartitionDecorator thread may not get unparked.
    
    1. Two PartitionerTasks are created.
    2. Before any of the PartitionerTasks set their runner, the main Partitioner thread enters await() and cancels each PartitionerTask.
    3. The cancel method executes ```
            if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
              ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
            }
            count.decrementAndGet();```
        For each PartitionerTask.
     4. The count goes to zero, but the main PartitionerDecorator thread still calls Locksupport.park() in await().
     5. Unpark wasn't called when the count reached zero so the main PartitionDecorator thread can remain parked.


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181265596
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
         private volatile IOException exp;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
    --- End diff --
    
    I think there is a race condition here. Consider the following case:
    
    1. A PartitionTask starts executing, let's call it **Task A**
    2. The PartitionTask executes the state check `state.get() == STATE.NEW` and then execution stops temporarily.
    3. The main PartitionDecorator thread executes await(count, partitionerTasks)
    4. `context.getExecutorState().shouldContinue()` is false so the PartitionTasks are cancelled.
    5. The cancel method is called for **Task A**
    6. In the cancel method ` (state.compareAndSet(STATE.NEW, mayInterruptIfRunning ? STATE.INTERRUPTING : STATE.CANCELLED)` will return true
    7. `Thread thread = runner.get();` is executed but it is null since **Task A** has not set the runner yet.
    8. The else statement in the cancel method is executed and `((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);` is called.
    9. The remove method does not cancel **Task A** since it has already started executing, and the interrupt is not set so it continue running.
    10. `count.decrementAndGet();` is executed so the count will be zero but **Task A** is still running.


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181926928
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---
    @@ -161,8 +161,11 @@ public OperatorStats getStats() {
        * @param schemaChanged  true if the schema has changed
        */
       @Override
    -  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException {
    +  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException, InterruptedException {
         for (OutgoingRecordBatch batch : outgoingBatches) {
    +      if (Thread.interrupted()) {
    +        throw new InterruptedException();
    --- End diff --
    
    I'll revert back throwing 'InterruptedException' to avoid mishandling of the last batch. 


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181851002
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +280,124 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    +      final Thread thread = Thread.currentThread();
    +      Preconditions.checkState(runner.compareAndSet(null, thread),
    +          "PartitionerTask can be executed only once.");
    +      if (state.get() == STATE.NEW) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
    +        localStats.clear();
    +        localStats.startProcessing();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          iface.execute(partitioner);
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        }
    +        if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +          if (executionException == null) {
    +            localStats.stopProcessing();
    +            state.lazySet(STATE.NORMAL);
    +          } else {
    +            exception = executionException;
    +            state.lazySet(STATE.EXCEPTIONAL);
    +          }
    +        }
    +        if (count.decrementAndGet() == 0) {
    +          LockSupport.unpark(partitionerDecorator.thread);
    +        }
    +        thread.setName(name);
    +      }
    +      runner.set(null);
    +      while (state.get() == STATE.INTERRUPTING) {
    +        Thread.yield();
    +      }
    +      // Clear interrupt flag
           try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    +        Thread.sleep(0);
    --- End diff --
    
    Could we use Thread.interrupted() instead? Javadoc suggests it's a good alternative to use for clearing the interrupt flag. Also it avoids an unnecessary yield on some JVM implementations.
    
    ```
        /**
         * Tests whether the current thread has been interrupted.  The
         * <i>interrupted status</i> of the thread is cleared by this method.  In
         * other words, if this method were to be called twice in succession, the
         * second call would return false (unless the current thread were
         * interrupted again, after the first call had cleared its interrupted
         * status and before the second call had examined it).
         *
         * <p>A thread interruption ignored because a thread was not alive
         * at the time of the interrupt will be reflected by this method
         * returning false.
         *
         * @return  <code>true</code> if the current thread has been interrupted;
         *          <code>false</code> otherwise.
         * @see #isInterrupted()
         * @revised 6.0
         */
    ```


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181871311
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -118,105 +127,114 @@ public PartitionOutgoingBatch getOutgoingBatches(int index) {
         return null;
       }
     
    -  @VisibleForTesting
    -  protected List<Partitioner> getPartitioners() {
    +  List<Partitioner> getPartitioners() {
         return partitioners;
       }
     
       /**
        * Helper to execute the different methods wrapped into same logic
        * @param iface
    -   * @throws IOException
    +   * @throws ExecutionException
        */
    -  protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException {
    -    if (partitioners.size() == 1 ) {
    -      // no need for threads
    -      final OperatorStats localStatsSingle = partitioners.get(0).getStats();
    -      localStatsSingle.clear();
    -      localStatsSingle.startProcessing();
    +  @VisibleForTesting
    +  void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException {
    +    // To simulate interruption of main fragment thread and interrupting the partition threads, create a
    +    // CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or
    +    // interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads.
    +    try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
    --- End diff --
    
    I see thx.


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181865979
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -118,105 +127,114 @@ public PartitionOutgoingBatch getOutgoingBatches(int index) {
         return null;
       }
     
    -  @VisibleForTesting
    -  protected List<Partitioner> getPartitioners() {
    +  List<Partitioner> getPartitioners() {
         return partitioners;
       }
     
       /**
        * Helper to execute the different methods wrapped into same logic
        * @param iface
    -   * @throws IOException
    +   * @throws ExecutionException
        */
    -  protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException {
    -    if (partitioners.size() == 1 ) {
    -      // no need for threads
    -      final OperatorStats localStatsSingle = partitioners.get(0).getStats();
    -      localStatsSingle.clear();
    -      localStatsSingle.startProcessing();
    +  @VisibleForTesting
    +  void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException {
    +    // To simulate interruption of main fragment thread and interrupting the partition threads, create a
    +    // CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or
    +    // interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads.
    +    try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
    --- End diff --
    
    The `testCountDownLatch` is used only for testing and initialized to 1. The wait is on `count`. 


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1208


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181262812
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
         private volatile IOException exp;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
    --- End diff --
    
    Never mind I think I see why


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181858695
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -118,105 +127,114 @@ public PartitionOutgoingBatch getOutgoingBatches(int index) {
         return null;
       }
     
    -  @VisibleForTesting
    -  protected List<Partitioner> getPartitioners() {
    +  List<Partitioner> getPartitioners() {
         return partitioners;
       }
     
       /**
        * Helper to execute the different methods wrapped into same logic
        * @param iface
    -   * @throws IOException
    +   * @throws ExecutionException
        */
    -  protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException {
    -    if (partitioners.size() == 1 ) {
    -      // no need for threads
    -      final OperatorStats localStatsSingle = partitioners.get(0).getStats();
    -      localStatsSingle.clear();
    -      localStatsSingle.startProcessing();
    +  @VisibleForTesting
    +  void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException {
    +    // To simulate interruption of main fragment thread and interrupting the partition threads, create a
    +    // CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or
    +    // interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads.
    +    try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
    --- End diff --
    
    I'm not sure that we should be using the injector to create a count down latch here. My understanding is that we have to define a `partitioner-sender-latch` injection site on the `"drill.exec.testing.controls"` property and it is intended only for testing. See ControlsInjectionUtil.createLatch(). The default value for `drill.exec.testing.controls` is empty so the getLatch method would return a Noop latch since `partitioner-sender-latch` is undefined. Since we always want to create a count down latch here (not just for testing) shouldn't we directly create one?


---

[GitHub] drill issue #1208: DRILL-6295: PartitionerDecorator may close partitioners w...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1208
  
    @parthchandra Please review


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181260542
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +279,120 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
         private volatile IOException exp;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (state.get() == STATE.NEW && runner.compareAndSet(null, thread)) {
    --- End diff --
    
    Why is this check necessary?


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181488592
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---
    @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      Preconditions.checkState(runner.compareAndSet(null, thread),
    +          "PartitionerTask can be executed only once.");
    +      if (state.get() == STATE.NEW) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
             localStats.clear();
             localStats.startProcessing();
    -        iface.execute(part);
    -      } catch (IOException e) {
    -        exp = e;
    -      } finally {
    -        localStats.stopProcessing();
    -        currThread.setName(currThreadName);
    -        latch.countDown();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          iface.execute(partitioner);
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        }
    +        if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +          if (executionException == null) {
    +            localStats.stopProcessing();
    +            state.lazySet(STATE.NORMAL);
    +          } else {
    +            exception = executionException;
    +            state.lazySet(STATE.EXCEPTIONAL);
    +          }
    +        }
    +        if (count.decrementAndGet() == 0) {
    +          LockSupport.unpark(partitionerDecorator.thread);
    +        }
    +        thread.setName(name);
    +      }
    +      runner.set(null);
    +      while (state.get() == STATE.INTERRUPTING) {
    --- End diff --
    
    I think we can still leave a dirty interrupt flag. Consider the following case.
    
     1. We create a PartitionerTask **Task A**.
     2. **Task A** executes and reaches `thread.setName(name)` in the run method.
     3. Now a cancel is called.
     4. `thread.interrupt()` and `state.lazySet(STATE.INTERRUPTED)` is executed.
     5. Now **Task A** can continue executing
     6. **Task A** checks `state.get() == STATE.INTERRUPTING` which is false. So Thread.sleep is never called and the interrupt flag is never cleared.


---

[GitHub] drill issue #1208: DRILL-6295: PartitionerDecorator may close partitioners w...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the issue:

    https://github.com/apache/drill/pull/1208
  
    +1


---

[GitHub] drill issue #1208: DRILL-6295: PartitionerDecorator may close partitioners w...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1208
  
    +1 LGTM I do not see anymore race conditions


---

[GitHub] drill pull request #1208: DRILL-6295: PartitionerDecorator may close partiti...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181894189
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---
    @@ -161,8 +161,11 @@ public OperatorStats getStats() {
        * @param schemaChanged  true if the schema has changed
        */
       @Override
    -  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException {
    +  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException, InterruptedException {
         for (OutgoingRecordBatch batch : outgoingBatches) {
    +      if (Thread.interrupted()) {
    +        throw new InterruptedException();
    --- End diff --
    
    Since we are checking for interrupts here already could we remove `Thread.currentThread().isInterrupted()` in the flush(boolean schemaChanged) method?


---