You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:22:59 UTC

[3/3] drill git commit: DRILL-2878: FragmentExecutor.closeOutResources() is not called if an exception happens in the Foreman before the fragment executor starts running

DRILL-2878: FragmentExecutor.closeOutResources() is not called if an exception happens in the Foreman before the fragment executor starts running


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a392e532
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a392e532
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a392e532

Branch: refs/heads/master
Commit: a392e532290033577a6d2a803e2ca7264d7d15b9
Parents: 1c09c2f
Author: adeneche <ad...@gmail.com>
Authored: Tue May 5 20:59:14 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:22:39 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/work/WorkManager.java |  6 +-
 .../exec/work/fragment/FragmentExecutor.java    | 11 +++-
 .../fragment/TestFragmentExecutorCancel.java    | 66 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index a3ceb8f..3e4f3d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -265,7 +265,11 @@ public class WorkManager implements AutoCloseable {
     }
 
     public void startFragmentPendingRemote(final FragmentManager handler) {
-      executor.execute(handler.getRunnable());
+      final FragmentExecutor fragmentExecutor = handler.getRunnable();
+      // cancelled fragment managers will return null fragment executors
+      if (fragmentExecutor != null) {
+        executor.execute(fragmentExecutor);
+      }
     }
 
     public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index dc83cc6..7baafc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -101,10 +101,9 @@ public class FragmentExecutor implements Runnable {
       return null;
     }
 
-    final FragmentStatus status = AbstractStatusReporter
+    return AbstractStatusReporter
         .getBuilder(fragmentContext, FragmentState.RUNNING, null)
         .build();
-    return status;
   }
 
   /**
@@ -120,11 +119,17 @@ public class FragmentExecutor implements Runnable {
      * For example, consider the case when the Foreman sets up the root fragment executor which is
      * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
      * run() method on the root executor will never be called, and the executor will never be ready
-     * to accept external events. This will make the cancelling thread wait forever.
+     * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
+     * acceptExternalEvents.
      */
     synchronized (this) {
       if (root != null) {
         acceptExternalEvents.awaitUninterruptibly();
+      } else {
+        // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
+        // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
+        // fragment will never start running.
+        closeOutResources();
       }
 
       /*

http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
new file mode 100644
index 0000000..eb5cc49
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+/**
+ * Run several tpch queries and inject an OutOfMemoryException in ScanBatch that will cause an OUT_OF_MEMORY outcome to
+ * be propagated downstream. Make sure the proper "memory error" message is sent to the client.
+ */
+public class TestFragmentExecutorCancel extends BaseTestQuery {
+
+  @Test
+  public void testCancelNonRunningFragments() throws Exception{
+    test("alter session set `planner.slice_target` = 10");
+
+    // Inject an out of memory exception in the ScanBatch
+    CoordinationProtos.DrillbitEndpoint endpoint = bits[0].getContext().getEndpoint();
+    String controlsString = "{\"injections\":[{"
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\","
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"" + "org.apache.drill.exec.physical.impl.ScanBatch" + "\","
+      + "\"desc\":\"" + "next-allocate" + "\","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1,"
+      + "\"exceptionClass\":\"" + "org.apache.drill.exec.memory.OutOfMemoryException" + "\""
+      + "}]}";
+    ControlsInjectionUtil.setControls(client, controlsString);
+
+    String query = getFile("queries/tpch/04.sql");
+
+    try {
+      test(query);
+      fail("The query should have failed!!!");
+    } catch(UserException uex) {
+      // The query should fail
+    }
+
+    try {
+      closeClient();
+    } catch (IllegalStateException ex) {
+      fail("failed to close the drillbits properly");
+    }
+  }
+}