You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/07/08 01:39:58 UTC

drill git commit: DRILL-3450: Moved methods from AbstractStatusReporter and NonRootStatusReporter to FragmentStatusReporter

Repository: drill
Updated Branches:
  refs/heads/master 48d8a59d1 -> 2506ecf15


DRILL-3450: Moved methods from AbstractStatusReporter and NonRootStatusReporter to FragmentStatusReporter

+ Removed StatusReporter interface
+ Refactored FragmentStatusReporter


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2506ecf1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2506ecf1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2506ecf1

Branch: refs/heads/master
Commit: 2506ecf1551bdb9a7dc6dbf950ba2c5c565eb1f4
Parents: 48d8a59
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Jul 1 21:39:53 2015 -0700
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Tue Jul 7 10:40:53 2015 -0700

----------------------------------------------------------------------
 .../exec/work/batch/ControlMessageHandler.java  |   6 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  10 +-
 .../drill/exec/work/foreman/QueryManager.java   |   9 --
 .../work/fragment/AbstractStatusReporter.java   |  94 ---------------
 .../exec/work/fragment/FragmentExecutor.java    |  30 +++--
 .../work/fragment/FragmentStatusReporter.java   | 116 +++++++++++++++++++
 .../work/fragment/NonRootFragmentManager.java   |   4 +-
 .../work/fragment/NonRootStatusReporter.java    |  44 -------
 .../exec/work/fragment/StatusReporter.java      |  30 -----
 9 files changed, 139 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 9f302a2..1c0eb80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -44,8 +44,8 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
 public class ControlMessageHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
@@ -130,8 +130,8 @@ public class ControlMessageHandler {
         final FragmentContext context = new FragmentContext(drillbitContext, fragment,
             drillbitContext.getFunctionImplementationRegistry());
         final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
-        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
-        final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener);
+        final FragmentStatusReporter statusReporter = new FragmentStatusReporter(context, tunnel);
+        final FragmentExecutor fr = new FragmentExecutor(context, fragment, statusReporter);
         bee.addFragmentRunner(fr);
       } else {
         // isIntermediate, store for incoming data.

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 716fb66..671deae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -69,6 +69,7 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -81,6 +82,7 @@ import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -125,8 +127,6 @@ public class Foreman implements Runnable {
 
   private volatile DistributedLease lease; // used to limit the number of concurrent queries
 
-  private FragmentExecutor rootRunner; // root Fragment
-
   private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
   private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
@@ -935,8 +935,9 @@ public class Foreman implements Runnable {
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    rootRunner = new FragmentExecutor(rootContext, rootFragment,
-        queryManager.newRootStatusHandler(rootContext, drillbitContext),
+    final ControlTunnel tunnel = drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
+    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment,
+        new FragmentStatusReporter(rootContext, tunnel),
         rootOperator);
     final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
@@ -945,7 +946,6 @@ public class Foreman implements Runnable {
       bee.addFragmentRunner(fragmentManager.getRunnable());
     } else {
       // if we do, record the fragment manager in the workBus.
-      // TODO aren't we managing our own work? What does this do? It looks like this will never get run
       drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 9318233..554a279 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -44,7 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStore;
@@ -52,8 +50,6 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-import org.apache.drill.exec.work.fragment.StatusReporter;
 
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import com.google.common.base.Preconditions;
@@ -451,11 +447,6 @@ public class QueryManager {
     }
   }
 
-  public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
-    final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
-    return new NonRootStatusReporter(context, tunnel);
-  }
-
   public FragmentStatusListener getFragmentStatusListener(){
     return fragmentStatusListener;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
deleted file mode 100644
index 8a40f1b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
-public abstract class AbstractStatusReporter implements StatusReporter{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
-
-  private final FragmentContext context;
-
-  public AbstractStatusReporter(final FragmentContext context) {
-    super();
-    this.context = context;
-  }
-
-  private  FragmentStatus.Builder getBuilder(final FragmentState state){
-    return getBuilder(context, state, null);
-  }
-
-  public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){
-    final FragmentStatus.Builder status = FragmentStatus.newBuilder();
-    final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
-    context.getStats().addMetricsToStatus(b);
-    b.setState(state);
-    if(ex != null){
-      final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-      b.setError(ex.getOrCreatePBError(verbose));
-    }
-    status.setHandle(context.getHandle());
-    b.setMemoryUsed(context.getAllocator().getAllocatedMemory());
-    b.setMinorFragmentId(context.getHandle().getMinorFragmentId());
-    status.setProfile(b);
-    return status;
-  }
-
-  @Override
-  public void stateChanged(final FragmentHandle handle, final FragmentState newState) {
-    final FragmentStatus.Builder status = getBuilder(newState);
-    logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState);
-    switch(newState){
-    case AWAITING_ALLOCATION:
-    case CANCELLATION_REQUESTED:
-    case CANCELLED:
-    case FINISHED:
-    case RUNNING:
-      statusChange(handle, status.build());
-      break;
-    case SENDING:
-      // no op.
-      break;
-    case FAILED:
-      // shouldn't get here since fail() should be called.
-    default:
-      throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
-    }
-  }
-
-  protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
-
-  @Override
-  public final void fail(final FragmentHandle handle, final UserException excep) {
-    final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
-    fail(handle, status);
-  }
-
-  private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) {
-    statusChange(handle, statusBuilder.build());
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 9eec782..6d03f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -58,7 +58,7 @@ public class FragmentExecutor implements Runnable {
   private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
   private final String fragmentName;
   private final FragmentContext fragmentContext;
-  private final StatusReporter listener;
+  private final FragmentStatusReporter statusReporter;
   private final DeferredException deferredException = new DeferredException();
   private final PlanFragment fragment;
   private final FragmentRoot rootOperator;
@@ -75,12 +75,11 @@ public class FragmentExecutor implements Runnable {
    *
    * @param context
    * @param fragment
-   * @param listener
-   * @param rootOperator
+   * @param statusReporter
    */
   public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
-      final StatusReporter listener) {
-    this(context, fragment, listener, null);
+      final FragmentStatusReporter statusReporter) {
+    this(context, fragment, statusReporter, null);
   }
 
   /**
@@ -88,13 +87,13 @@ public class FragmentExecutor implements Runnable {
    *
    * @param context
    * @param fragment
-   * @param listener
+   * @param statusReporter
    * @param rootOperator
    */
   public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
-      final StatusReporter listener, final FragmentRoot rootOperator) {
+      final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) {
     this.fragmentContext = context;
-    this.listener = listener;
+    this.statusReporter = statusReporter;
     this.fragment = fragment;
     this.rootOperator = rootOperator;
     this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
@@ -131,9 +130,7 @@ public class FragmentExecutor implements Runnable {
       return null;
     }
 
-    return AbstractStatusReporter
-        .getBuilder(fragmentContext, FragmentState.RUNNING, null)
-        .build();
+    return statusReporter.getStatus(FragmentState.RUNNING);
   }
 
   /**
@@ -327,9 +324,9 @@ public class FragmentExecutor implements Runnable {
           .addIdentity(getContext().getIdentity())
           .addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId())
           .build(logger);
-      listener.fail(fragmentContext.getHandle(), uex);
+      statusReporter.fail(uex);
     } else {
-      listener.stateChanged(fragmentContext.getHandle(), outcome);
+      statusReporter.stateChanged(outcome);
     }
   }
 
@@ -362,7 +359,6 @@ public class FragmentExecutor implements Runnable {
   }
 
   private synchronized boolean updateState(FragmentState target) {
-    final FragmentHandle handle = fragmentContext.getHandle();
     final FragmentState current = fragmentState.get();
     logger.info(fragmentName + ": State change requested {} --> {}", current, target);
     switch (target) {
@@ -372,7 +368,7 @@ public class FragmentExecutor implements Runnable {
       case AWAITING_ALLOCATION:
       case RUNNING:
         fragmentState.set(target);
-        listener.stateChanged(handle, target);
+        statusReporter.stateChanged(target);
         return true;
 
       default:
@@ -390,7 +386,7 @@ public class FragmentExecutor implements Runnable {
     case FAILED:
       if(!isTerminal(current)){
         fragmentState.set(target);
-        // don't notify listener until we finalize this terminal state.
+        // don't notify reporter until we finalize this terminal state.
         return true;
       } else if (current == FragmentState.FAILED) {
         // no warn since we can call fail multiple times.
@@ -406,7 +402,7 @@ public class FragmentExecutor implements Runnable {
     case RUNNING:
       if(current == FragmentState.AWAITING_ALLOCATION){
         fragmentState.set(target);
-        listener.stateChanged(handle, target);
+        statusReporter.stateChanged(target);
         return true;
       }else{
         errorStateChange(current, target);

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
new file mode 100644
index 0000000..3dd9dc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+
+/**
+ * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
+ * Foreman through a control tunnel.
+ */
+public class FragmentStatusReporter {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
+
+  private final FragmentContext context;
+  private final ControlTunnel tunnel;
+
+  public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
+    this.context = context;
+    this.tunnel = tunnel;
+  }
+
+  /**
+   * Returns a {@link FragmentStatus} with the given state. {@link FragmentStatus} has additional information like
+   * metrics, etc. that is gathered from the {@link FragmentContext}.
+   *
+   * @param state the state to include in the status
+   * @return the status
+   */
+  FragmentStatus getStatus(final FragmentState state) {
+    return getStatus(state, null);
+  }
+
+  private FragmentStatus getStatus(final FragmentState state, final UserException ex) {
+    final FragmentStatus.Builder status = FragmentStatus.newBuilder();
+    final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+    context.getStats().addMetricsToStatus(b);
+    b.setState(state);
+    if (ex != null) {
+      final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+      b.setError(ex.getOrCreatePBError(verbose));
+    }
+    status.setHandle(context.getHandle());
+    b.setMemoryUsed(context.getAllocator().getAllocatedMemory());
+    b.setMinorFragmentId(context.getHandle().getMinorFragmentId());
+    status.setProfile(b);
+    return status.build();
+  }
+
+  /**
+   * Reports the state change to the Foreman. The state is wrapped in a {@link FragmentStatus} that has additional
+   * information like metrics, etc. This additional information is gathered from the {@link FragmentContext}.
+   * NOTE: Use {@link #fail} to report state change to {@link FragmentState#FAILED}.
+   *
+   * @param newState the new state
+   */
+  void stateChanged(final FragmentState newState) {
+    final FragmentStatus status = getStatus(newState, null);
+    logger.info("{}: State to report: {}", QueryIdHelper.getQueryIdentifier(context.getHandle()), newState);
+    switch (newState) {
+    case AWAITING_ALLOCATION:
+    case CANCELLATION_REQUESTED:
+    case CANCELLED:
+    case FINISHED:
+    case RUNNING:
+      sendStatus(status);
+      break;
+    case SENDING:
+      // no op.
+      break;
+    case FAILED:
+      // shouldn't get here since fail() should be called.
+    default:
+      throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
+    }
+  }
+
+  private void sendStatus(final FragmentStatus status) {
+    tunnel.sendFragmentStatus(status);
+  }
+
+  /**
+   * {@link FragmentStatus} with the {@link FragmentState#FAILED} state is reported to the Foreman. The
+   * {@link FragmentStatus} has additional information like metrics, etc. that is gathered from the
+   * {@link FragmentContext}.
+   *
+   * @param ex the exception related to the failure
+   */
+  void fail(final UserException ex) {
+    final FragmentStatus status = getStatus(FragmentState.FAILED, ex);
+    sendStatus(status);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 77440c5..3fc757c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -55,8 +55,8 @@ public class NonRootFragmentManager implements FragmentManager {
       this.handle = fragment.getHandle();
       this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(fragment, this.context);
-      final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
-          fragment.getForeman()));
+      final FragmentStatusReporter reporter = new FragmentStatusReporter(this.context,
+          context.getController().getTunnel(fragment.getForeman()));
       this.runner = new FragmentExecutor(this.context, fragment, reporter);
       this.context.setBuffers(buffers);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
deleted file mode 100644
index 71da12b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-
-/**
- * For all non root fragments, status will be reported back to the foreman through a control tunnel.
- */
-public class NonRootStatusReporter extends AbstractStatusReporter{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootStatusReporter.class);
-
-  private final ControlTunnel tunnel;
-
-  public NonRootStatusReporter(FragmentContext context, ControlTunnel tunnel) {
-    super(context);
-    this.tunnel = tunnel;
-  }
-
-  @Override
-  protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-    logger.debug("Sending status change message message to remote node: " + status);
-    tunnel.sendFragmentStatus(status);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
deleted file mode 100644
index 424e7fb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-
-/**
- * The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman.
- */
-public interface StatusReporter {
-  void fail(FragmentHandle handle, UserException excep);
-  void stateChanged(FragmentHandle handle, FragmentState newState);
-}