You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vrozov <gi...@git.apache.org> on 2017/11/17 01:33:39 UTC

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

GitHub user vrozov opened a pull request:

    https://github.com/apache/drill/pull/1041

    DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vrozov/drill DRILL-5961

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1041.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1041
    
----
commit f7e0c44c1277cc3fa0cdf466e01521974c98262d
Author: Vlad Rozov <vr...@apache.org>
Date:   2017-11-15T00:24:01Z

    DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments

commit 575dec0fdc0dc4efb50569afa568c06f21546e6e
Author: Vlad Rozov <vr...@apache.org>
Date:   2017-11-15T00:35:09Z

    DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments

----


---

[GitHub] drill issue #1041: DRILL-5961: For long running queries (> 10 min) Drill may...

Posted by priteshm <gi...@git.apache.org>.
Github user priteshm commented on the issue:

    https://github.com/apache/drill/pull/1041
  
    @parthchandra or @adityakishore since you'll have made changes to these files before, can you review the changes here?


---

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1041


---

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1041#discussion_r155433245
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---
    @@ -277,7 +277,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
             @Override
             protected void cleanup() {
               runningFragments.remove(fragmentHandle);
    -          workBus.removeFragmentManager(fragmentHandle);
    +          if (!fragmentManager.isCancelled()) {
    +            workBus.removeFragmentManager(fragmentHandle, false);
    --- End diff --
    
    If cleanup is not called as a result of FragmentManager cancellation, it is part of the regular cleanup after run is complete.


---

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1041#discussion_r155397359
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---
    @@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
     
       public void addFragmentManager(final FragmentManager fragmentManager) {
         if (logger.isDebugEnabled()) {
    -      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
    +      logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
         }
         final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
    -      if (old != null) {
    -        throw new IllegalStateException(
    -            "Tried to set fragment manager when has already been set for the provided fragment handle.");
    -    }
    -  }
    -
    -  public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
    -    synchronized (this) {
    -      return managers.get(handle);
    +    if (old != null) {
    +      throw new IllegalStateException(
    +          String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
         }
       }
     
    -  public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
    -    synchronized (this) {
    -      // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
    -      if (recentlyFinishedFragments.asMap().containsKey(handle)) {
    -        if (logger.isDebugEnabled()) {
    -          logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
    -        }
    -        return null;
    -      }
    -
    -      // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
    -      final FragmentManager m = managers.get(handle);
    -      if (m != null) {
    -        return m;
    -      }
    -    }
    -    throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
    -        + QueryIdHelper.getQueryIdentifier(handle));
    +  public FragmentManager getFragmentManager(final FragmentHandle handle) {
    +    return managers.get(handle);
       }
     
       /**
    -   * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
    -   * multiple times. The manager will be removed only once (the first call).
    -   * @param handle the handle to the fragment
    -   */
    -  public void removeFragmentManager(final FragmentHandle handle) {
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager != null) {
    -        recentlyFinishedFragments.put(handle, 1);
    -        managers.remove(handle);
    -      } else {
    -        logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
    -   * for fragments waiting on data (root and intermediate).
    +   * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
    +   * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
    +   * only once (the first call).
        * @param handle the handle to the fragment
    +   * @param cancel
        * @return if the fragment was found and removed from the event bus
        */
    -  public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager == null) {
    -        return false;
    +  public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
    +    final FragmentManager manager = managers.remove(handle);
    +    if (manager != null) {
    +      assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
    +      if (cancel) {
    +        manager.cancel();
    +      }
    +      if (logger.isDebugEnabled()) {
    +        logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
    +            QueryIdHelper.getQueryIdentifier(handle), manager);
           }
    -
    -      manager.cancel();
    -      recentlyFinishedFragments.put(handle, 1);
    -      managers.remove(handle);
           return true;
    +    } else if (logger.isWarnEnabled()) {
    +      logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
         }
    +    return false;
    --- End diff --
    
    Seems like this is redundant. Doesn't seem like we ever return false, and the callers never seem to check.


---

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1041#discussion_r155433860
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---
    @@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
     
       public void addFragmentManager(final FragmentManager fragmentManager) {
         if (logger.isDebugEnabled()) {
    -      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
    +      logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
         }
         final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
    -      if (old != null) {
    -        throw new IllegalStateException(
    -            "Tried to set fragment manager when has already been set for the provided fragment handle.");
    -    }
    -  }
    -
    -  public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
    -    synchronized (this) {
    -      return managers.get(handle);
    +    if (old != null) {
    +      throw new IllegalStateException(
    +          String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
         }
       }
     
    -  public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
    -    synchronized (this) {
    -      // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
    -      if (recentlyFinishedFragments.asMap().containsKey(handle)) {
    -        if (logger.isDebugEnabled()) {
    -          logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
    -        }
    -        return null;
    -      }
    -
    -      // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
    -      final FragmentManager m = managers.get(handle);
    -      if (m != null) {
    -        return m;
    -      }
    -    }
    -    throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
    -        + QueryIdHelper.getQueryIdentifier(handle));
    +  public FragmentManager getFragmentManager(final FragmentHandle handle) {
    +    return managers.get(handle);
       }
     
       /**
    -   * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
    -   * multiple times. The manager will be removed only once (the first call).
    -   * @param handle the handle to the fragment
    -   */
    -  public void removeFragmentManager(final FragmentHandle handle) {
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager != null) {
    -        recentlyFinishedFragments.put(handle, 1);
    -        managers.remove(handle);
    -      } else {
    -        logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
    -   * for fragments waiting on data (root and intermediate).
    +   * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
    +   * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
    +   * only once (the first call).
        * @param handle the handle to the fragment
    +   * @param cancel
        * @return if the fragment was found and removed from the event bus
        */
    -  public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager == null) {
    -        return false;
    +  public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
    +    final FragmentManager manager = managers.remove(handle);
    +    if (manager != null) {
    +      assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
    +      if (cancel) {
    +        manager.cancel();
    +      }
    +      if (logger.isDebugEnabled()) {
    +        logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
    +            QueryIdHelper.getQueryIdentifier(handle), manager);
           }
    -
    -      manager.cancel();
    -      recentlyFinishedFragments.put(handle, 1);
    -      managers.remove(handle);
           return true;
    +    } else if (logger.isWarnEnabled()) {
    +      logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
         }
    +    return false;
    --- End diff --
    
    ControlMessageHandler checks the result on line 196.


---

[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1041#discussion_r155396889
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---
    @@ -277,7 +277,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
             @Override
             protected void cleanup() {
               runningFragments.remove(fragmentHandle);
    -          workBus.removeFragmentManager(fragmentHandle);
    +          if (!fragmentManager.isCancelled()) {
    +            workBus.removeFragmentManager(fragmentHandle, false);
    --- End diff --
    
    Not sure why you don't want to cancel here. 


---