You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:22:59 UTC
[3/3] drill git commit: DRILL-2878:
FragmentExecutor.closeOutResources() is not called if an exception happens in
the Foreman before the fragment executor starts running
DRILL-2878: FragmentExecutor.closeOutResources() is not called if an exception happens in the Foreman before the fragment executor starts running
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a392e532
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a392e532
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a392e532
Branch: refs/heads/master
Commit: a392e532290033577a6d2a803e2ca7264d7d15b9
Parents: 1c09c2f
Author: adeneche <ad...@gmail.com>
Authored: Tue May 5 20:59:14 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:22:39 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/work/WorkManager.java | 6 +-
.../exec/work/fragment/FragmentExecutor.java | 11 +++-
.../fragment/TestFragmentExecutorCancel.java | 66 ++++++++++++++++++++
3 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index a3ceb8f..3e4f3d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -265,7 +265,11 @@ public class WorkManager implements AutoCloseable {
}
public void startFragmentPendingRemote(final FragmentManager handler) {
- executor.execute(handler.getRunnable());
+ final FragmentExecutor fragmentExecutor = handler.getRunnable();
+ // cancelled fragment managers will return null fragment executors
+ if (fragmentExecutor != null) {
+ executor.execute(fragmentExecutor);
+ }
}
public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index dc83cc6..7baafc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -101,10 +101,9 @@ public class FragmentExecutor implements Runnable {
return null;
}
- final FragmentStatus status = AbstractStatusReporter
+ return AbstractStatusReporter
.getBuilder(fragmentContext, FragmentState.RUNNING, null)
.build();
- return status;
}
/**
@@ -120,11 +119,17 @@ public class FragmentExecutor implements Runnable {
* For example, consider the case when the Foreman sets up the root fragment executor which is
* waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
* run() method on the root executor will never be called, and the executor will never be ready
- * to accept external events. This will make the cancelling thread wait forever.
+ * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
+ * acceptExternalEvents.
*/
synchronized (this) {
if (root != null) {
acceptExternalEvents.awaitUninterruptibly();
+ } else {
+ // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
+ // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
+ // fragment will never start running.
+ closeOutResources();
}
/*
http://git-wip-us.apache.org/repos/asf/drill/blob/a392e532/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
new file mode 100644
index 0000000..eb5cc49
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/TestFragmentExecutorCancel.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+/**
+ * Run several tpch queries and inject an OutOfMemoryException in ScanBatch that will cause an OUT_OF_MEMORY outcome to
+ * be propagated downstream. Make sure the proper "memory error" message is sent to the client.
+ */
+public class TestFragmentExecutorCancel extends BaseTestQuery {
+
+ @Test
+ public void testCancelNonRunningFragments() throws Exception{
+ test("alter session set `planner.slice_target` = 10");
+
+ // Inject an out of memory exception in the ScanBatch
+ CoordinationProtos.DrillbitEndpoint endpoint = bits[0].getContext().getEndpoint();
+ String controlsString = "{\"injections\":[{"
+ + "\"address\":\"" + endpoint.getAddress() + "\","
+ + "\"port\":\"" + endpoint.getUserPort() + "\","
+ + "\"type\":\"exception\","
+ + "\"siteClass\":\"" + "org.apache.drill.exec.physical.impl.ScanBatch" + "\","
+ + "\"desc\":\"" + "next-allocate" + "\","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1,"
+ + "\"exceptionClass\":\"" + "org.apache.drill.exec.memory.OutOfMemoryException" + "\""
+ + "}]}";
+ ControlsInjectionUtil.setControls(client, controlsString);
+
+ String query = getFile("queries/tpch/04.sql");
+
+ try {
+ test(query);
+ fail("The query should have failed!!!");
+ } catch(UserException uex) {
+ // The query should fail
+ }
+
+ try {
+ closeClient();
+ } catch (IllegalStateException ex) {
+ fail("failed to close the drillbits properly");
+ }
+ }
+}