You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/17 06:29:47 UTC
drill git commit: DRILL-3449: When Foreman node dies,
the FragmentExecutor still tries to send status updates to Foreman
Repository: drill
Updated Branches:
refs/heads/master 6adeb9860 -> 75bd1d04b
DRILL-3449: When Foreman node dies, the FragmentExecutor still tries to send status updates to Foreman
closes #934
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/75bd1d04
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/75bd1d04
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/75bd1d04
Branch: refs/heads/master
Commit: 75bd1d04b01d23fc14730d6aba20964582990fa3
Parents: 6adeb98
Author: Vlad Rozov <vr...@apache.org>
Authored: Wed Sep 6 18:29:02 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Sat Sep 16 23:28:51 2017 -0700
----------------------------------------------------------------------
.../exec/work/fragment/FragmentExecutor.java | 2 +
.../work/fragment/FragmentStatusReporter.java | 25 ++++-
.../fragment/FragmentStatusReporterTest.java | 106 +++++++++++++++++++
3 files changed, 129 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index daa94b7..258e485 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -300,6 +300,7 @@ public class FragmentExecutor implements Runnable {
} else {
statusReporter.stateChanged(outcome);
}
+ statusReporter.close();
}
@@ -444,6 +445,7 @@ public class FragmentExecutor implements Runnable {
logger.warn("Foreman {} no longer active. Cancelling fragment {}.",
foremanEndpoint.getAddress(),
QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
+ statusReporter.close();
FragmentExecutor.this.cancel();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index 3dd9dc5..e37435c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -27,19 +27,21 @@ import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.control.ControlTunnel;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
* Foreman through a control tunnel.
*/
-public class FragmentStatusReporter {
+public class FragmentStatusReporter implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
private final FragmentContext context;
- private final ControlTunnel tunnel;
+ private final AtomicReference<ControlTunnel> tunnel;
public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
this.context = context;
- this.tunnel = tunnel;
+ this.tunnel = new AtomicReference<>(tunnel);
}
/**
@@ -98,7 +100,12 @@ public class FragmentStatusReporter {
}
private void sendStatus(final FragmentStatus status) {
- tunnel.sendFragmentStatus(status);
+ final ControlTunnel tunnel = this.tunnel.get();
+ if (tunnel != null) {
+ tunnel.sendFragmentStatus(status);
+ } else {
+ logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this);
+ }
}
/**
@@ -113,4 +120,14 @@ public class FragmentStatusReporter {
sendStatus(status);
}
+ @Override
+ public void close()
+ {
+ final ControlTunnel tunnel = this.tunnel.getAndSet(null);
+ if (tunnel != null) {
+ logger.debug("Closing {}", this);
+ } else {
+ logger.warn("{} was already closed", this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
new file mode 100644
index 0000000..d4d198b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentStats;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
+
+public class FragmentStatusReporterTest {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class);
+
+ private FragmentStatusReporter statusReporter;
+ private ControlTunnel tunnel;
+
+ @Before
+ public void setUp() throws Exception {
+ FragmentContext context = mock(FragmentContext.class);
+ when(context.getStats()).thenReturn(mock(FragmentStats.class));
+ when(context.getHandle()).thenReturn(FragmentHandle.getDefaultInstance());
+ when(context.getAllocator()).thenReturn(mock(BufferAllocator.class));
+ tunnel = mock(ControlTunnel.class);
+ statusReporter = new FragmentStatusReporter(context, tunnel);
+ }
+
+ @Test
+ public void testStateChanged() throws Exception {
+ for (FragmentState state : FragmentState.values()) {
+ try {
+ statusReporter.stateChanged(state);
+ if (state == FAILED) {
+ fail("Expected exception: " + IllegalStateException.class.getName());
+ }
+ } catch (IllegalStateException e) {
+ if (state != FAILED) {
+ fail("Unexpected exception: " + e.toString());
+ }
+ }
+ }
+ verify(tunnel, times(FragmentState.values().length - 2)) /* exclude SENDING and FAILED */
+ .sendFragmentStatus(any(FragmentStatus.class));
+ }
+
+ @Test
+ public void testFail() throws Exception {
+ statusReporter.fail(null);
+ verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ statusReporter.close();
+ verifyZeroInteractions(tunnel);
+ }
+
+ @Test
+ public void testCloseClosed() throws Exception {
+ statusReporter.close();
+ statusReporter.close();
+ verifyZeroInteractions(tunnel);
+ }
+
+ @Test
+ public void testStateChangedAfterClose() throws Exception {
+ statusReporter.stateChanged(RUNNING);
+ verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+ statusReporter.close();
+ statusReporter.stateChanged(CANCELLATION_REQUESTED);
+ verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+ }
+}