You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/17 06:29:47 UTC

drill git commit: DRILL-3449: When Foreman node dies, the FragmentExecutor still tries to send status updates to Foreman

Repository: drill
Updated Branches:
  refs/heads/master 6adeb9860 -> 75bd1d04b


DRILL-3449: When Foreman node dies, the FragmentExecutor still tries to send status updates to Foreman

closes #934


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/75bd1d04
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/75bd1d04
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/75bd1d04

Branch: refs/heads/master
Commit: 75bd1d04b01d23fc14730d6aba20964582990fa3
Parents: 6adeb98
Author: Vlad Rozov <vr...@apache.org>
Authored: Wed Sep 6 18:29:02 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Sat Sep 16 23:28:51 2017 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    |   2 +
 .../work/fragment/FragmentStatusReporter.java   |  25 ++++-
 .../fragment/FragmentStatusReporterTest.java    | 106 +++++++++++++++++++
 3 files changed, 129 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index daa94b7..258e485 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -300,6 +300,7 @@ public class FragmentExecutor implements Runnable {
     } else {
       statusReporter.stateChanged(outcome);
     }
+    statusReporter.close();
   }
 
 
@@ -444,6 +445,7 @@ public class FragmentExecutor implements Runnable {
         logger.warn("Foreman {} no longer active.  Cancelling fragment {}.",
                     foremanEndpoint.getAddress(),
                     QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
+        statusReporter.close();
         FragmentExecutor.this.cancel();
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index 3dd9dc5..e37435c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -27,19 +27,21 @@ import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
  * Foreman through a control tunnel.
  */
-public class FragmentStatusReporter {
+public class FragmentStatusReporter implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
 
   private final FragmentContext context;
-  private final ControlTunnel tunnel;
+  private final AtomicReference<ControlTunnel> tunnel;
 
   public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
     this.context = context;
-    this.tunnel = tunnel;
+    this.tunnel = new AtomicReference<>(tunnel);
   }
 
   /**
@@ -98,7 +100,12 @@ public class FragmentStatusReporter {
   }
 
   private void sendStatus(final FragmentStatus status) {
-    tunnel.sendFragmentStatus(status);
+    final ControlTunnel tunnel = this.tunnel.get();
+    if (tunnel != null) {
+      tunnel.sendFragmentStatus(status);
+    } else {
+      logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this);
+    }
   }
 
   /**
@@ -113,4 +120,14 @@ public class FragmentStatusReporter {
     sendStatus(status);
   }
 
+  @Override
+  public void close()
+  {
+    final ControlTunnel tunnel = this.tunnel.getAndSet(null);
+    if (tunnel != null) {
+      logger.debug("Closing {}", this);
+    } else {
+      logger.warn("{} was already closed", this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/75bd1d04/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
new file mode 100644
index 0000000..d4d198b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentStats;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
+
+public class FragmentStatusReporterTest {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporterTest.class);
+
+  private FragmentStatusReporter statusReporter;
+  private ControlTunnel tunnel;
+
+  @Before
+  public void setUp() throws Exception {
+    FragmentContext context = mock(FragmentContext.class);
+    when(context.getStats()).thenReturn(mock(FragmentStats.class));
+    when(context.getHandle()).thenReturn(FragmentHandle.getDefaultInstance());
+    when(context.getAllocator()).thenReturn(mock(BufferAllocator.class));
+    tunnel = mock(ControlTunnel.class);
+    statusReporter = new FragmentStatusReporter(context, tunnel);
+  }
+
+  @Test
+  public void testStateChanged() throws Exception {
+    for (FragmentState state : FragmentState.values()) {
+      try {
+        statusReporter.stateChanged(state);
+        if (state == FAILED) {
+          fail("Expected exception: " + IllegalStateException.class.getName());
+        }
+      } catch (IllegalStateException e) {
+        if (state != FAILED) {
+          fail("Unexpected exception: " + e.toString());
+        }
+      }
+    }
+    verify(tunnel, times(FragmentState.values().length - 2)) /* exclude SENDING and FAILED */
+        .sendFragmentStatus(any(FragmentStatus.class));
+  }
+
+  @Test
+  public void testFail() throws Exception {
+    statusReporter.fail(null);
+    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    statusReporter.close();
+    verifyZeroInteractions(tunnel);
+  }
+
+  @Test
+  public void testCloseClosed() throws Exception {
+    statusReporter.close();
+    statusReporter.close();
+    verifyZeroInteractions(tunnel);
+  }
+
+  @Test
+  public void testStateChangedAfterClose() throws Exception {
+    statusReporter.stateChanged(RUNNING);
+    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+    statusReporter.close();
+    statusReporter.stateChanged(CANCELLATION_REQUESTED);
+    verify(tunnel).sendFragmentStatus(any(FragmentStatus.class));
+  }
+}