You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/07/08 01:39:58 UTC
drill git commit: DRILL-3450: Moved methods from
AbstractStatusReporter and NonRootStatusReporter to FragmentStatusReporter
Repository: drill
Updated Branches:
refs/heads/master 48d8a59d1 -> 2506ecf15
DRILL-3450: Moved methods from AbstractStatusReporter and NonRootStatusReporter to FragmentStatusReporter
+ Removed StatusReporter interface
+ Refactored FragmentStatusReporter
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2506ecf1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2506ecf1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2506ecf1
Branch: refs/heads/master
Commit: 2506ecf1551bdb9a7dc6dbf950ba2c5c565eb1f4
Parents: 48d8a59
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Jul 1 21:39:53 2015 -0700
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Tue Jul 7 10:40:53 2015 -0700
----------------------------------------------------------------------
.../exec/work/batch/ControlMessageHandler.java | 6 +-
.../apache/drill/exec/work/foreman/Foreman.java | 10 +-
.../drill/exec/work/foreman/QueryManager.java | 9 --
.../work/fragment/AbstractStatusReporter.java | 94 ---------------
.../exec/work/fragment/FragmentExecutor.java | 30 +++--
.../work/fragment/FragmentStatusReporter.java | 116 +++++++++++++++++++
.../work/fragment/NonRootFragmentManager.java | 4 +-
.../work/fragment/NonRootStatusReporter.java | 44 -------
.../exec/work/fragment/StatusReporter.java | 30 -----
9 files changed, 139 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 9f302a2..1c0eb80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -44,8 +44,8 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
public class ControlMessageHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
@@ -130,8 +130,8 @@ public class ControlMessageHandler {
final FragmentContext context = new FragmentContext(drillbitContext, fragment,
drillbitContext.getFunctionImplementationRegistry());
final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
- final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener);
+ final FragmentStatusReporter statusReporter = new FragmentStatusReporter(context, tunnel);
+ final FragmentExecutor fr = new FragmentExecutor(context, fragment, statusReporter);
bee.addFragmentRunner(fr);
} else {
// isIntermediate, store for incoming data.
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 716fb66..671deae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -69,6 +69,7 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
@@ -81,6 +82,7 @@ import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.codehaus.jackson.map.ObjectMapper;
@@ -125,8 +127,6 @@ public class Foreman implements Runnable {
private volatile DistributedLease lease; // used to limit the number of concurrent queries
- private FragmentExecutor rootRunner; // root Fragment
-
private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
@@ -935,8 +935,9 @@ public class Foreman implements Runnable {
queryManager.addFragmentStatusTracker(rootFragment, true);
- rootRunner = new FragmentExecutor(rootContext, rootFragment,
- queryManager.newRootStatusHandler(rootContext, drillbitContext),
+ final ControlTunnel tunnel = drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
+ final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment,
+ new FragmentStatusReporter(rootContext, tunnel),
rootOperator);
final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
@@ -945,7 +946,6 @@ public class Foreman implements Runnable {
bee.addFragmentRunner(fragmentManager.getRunnable());
} else {
// if we do, record the fragment manager in the workBus.
- // TODO aren't we managing our own work? What does this do? It looks like this will never get run
drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 9318233..554a279 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -44,7 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStore;
@@ -52,8 +50,6 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-import org.apache.drill.exec.work.fragment.StatusReporter;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.google.common.base.Preconditions;
@@ -451,11 +447,6 @@ public class QueryManager {
}
}
- public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
- final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
- return new NonRootStatusReporter(context, tunnel);
- }
-
public FragmentStatusListener getFragmentStatusListener(){
return fragmentStatusListener;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
deleted file mode 100644
index 8a40f1b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
-public abstract class AbstractStatusReporter implements StatusReporter{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
-
- private final FragmentContext context;
-
- public AbstractStatusReporter(final FragmentContext context) {
- super();
- this.context = context;
- }
-
- private FragmentStatus.Builder getBuilder(final FragmentState state){
- return getBuilder(context, state, null);
- }
-
- public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){
- final FragmentStatus.Builder status = FragmentStatus.newBuilder();
- final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
- context.getStats().addMetricsToStatus(b);
- b.setState(state);
- if(ex != null){
- final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
- b.setError(ex.getOrCreatePBError(verbose));
- }
- status.setHandle(context.getHandle());
- b.setMemoryUsed(context.getAllocator().getAllocatedMemory());
- b.setMinorFragmentId(context.getHandle().getMinorFragmentId());
- status.setProfile(b);
- return status;
- }
-
- @Override
- public void stateChanged(final FragmentHandle handle, final FragmentState newState) {
- final FragmentStatus.Builder status = getBuilder(newState);
- logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState);
- switch(newState){
- case AWAITING_ALLOCATION:
- case CANCELLATION_REQUESTED:
- case CANCELLED:
- case FINISHED:
- case RUNNING:
- statusChange(handle, status.build());
- break;
- case SENDING:
- // no op.
- break;
- case FAILED:
- // shouldn't get here since fail() should be called.
- default:
- throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
- }
- }
-
- protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
-
- @Override
- public final void fail(final FragmentHandle handle, final UserException excep) {
- final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
- fail(handle, status);
- }
-
- private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) {
- statusChange(handle, statusBuilder.build());
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 9eec782..6d03f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -58,7 +58,7 @@ public class FragmentExecutor implements Runnable {
private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
private final String fragmentName;
private final FragmentContext fragmentContext;
- private final StatusReporter listener;
+ private final FragmentStatusReporter statusReporter;
private final DeferredException deferredException = new DeferredException();
private final PlanFragment fragment;
private final FragmentRoot rootOperator;
@@ -75,12 +75,11 @@ public class FragmentExecutor implements Runnable {
*
* @param context
* @param fragment
- * @param listener
- * @param rootOperator
+ * @param statusReporter
*/
public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
- final StatusReporter listener) {
- this(context, fragment, listener, null);
+ final FragmentStatusReporter statusReporter) {
+ this(context, fragment, statusReporter, null);
}
/**
@@ -88,13 +87,13 @@ public class FragmentExecutor implements Runnable {
*
* @param context
* @param fragment
- * @param listener
+ * @param statusReporter
* @param rootOperator
*/
public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
- final StatusReporter listener, final FragmentRoot rootOperator) {
+ final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) {
this.fragmentContext = context;
- this.listener = listener;
+ this.statusReporter = statusReporter;
this.fragment = fragment;
this.rootOperator = rootOperator;
this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
@@ -131,9 +130,7 @@ public class FragmentExecutor implements Runnable {
return null;
}
- return AbstractStatusReporter
- .getBuilder(fragmentContext, FragmentState.RUNNING, null)
- .build();
+ return statusReporter.getStatus(FragmentState.RUNNING);
}
/**
@@ -327,9 +324,9 @@ public class FragmentExecutor implements Runnable {
.addIdentity(getContext().getIdentity())
.addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId())
.build(logger);
- listener.fail(fragmentContext.getHandle(), uex);
+ statusReporter.fail(uex);
} else {
- listener.stateChanged(fragmentContext.getHandle(), outcome);
+ statusReporter.stateChanged(outcome);
}
}
@@ -362,7 +359,6 @@ public class FragmentExecutor implements Runnable {
}
private synchronized boolean updateState(FragmentState target) {
- final FragmentHandle handle = fragmentContext.getHandle();
final FragmentState current = fragmentState.get();
logger.info(fragmentName + ": State change requested {} --> {}", current, target);
switch (target) {
@@ -372,7 +368,7 @@ public class FragmentExecutor implements Runnable {
case AWAITING_ALLOCATION:
case RUNNING:
fragmentState.set(target);
- listener.stateChanged(handle, target);
+ statusReporter.stateChanged(target);
return true;
default:
@@ -390,7 +386,7 @@ public class FragmentExecutor implements Runnable {
case FAILED:
if(!isTerminal(current)){
fragmentState.set(target);
- // don't notify listener until we finalize this terminal state.
+ // don't notify reporter until we finalize this terminal state.
return true;
} else if (current == FragmentState.FAILED) {
// no warn since we can call fail multiple times.
@@ -406,7 +402,7 @@ public class FragmentExecutor implements Runnable {
case RUNNING:
if(current == FragmentState.AWAITING_ALLOCATION){
fragmentState.set(target);
- listener.stateChanged(handle, target);
+ statusReporter.stateChanged(target);
return true;
}else{
errorStateChange(current, target);
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
new file mode 100644
index 0000000..3dd9dc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+
+/**
+ * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
+ * Foreman through a control tunnel.
+ */
+public class FragmentStatusReporter {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
+
+ private final FragmentContext context;
+ private final ControlTunnel tunnel;
+
+ public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
+ this.context = context;
+ this.tunnel = tunnel;
+ }
+
+ /**
+ * Returns a {@link FragmentStatus} with the given state. {@link FragmentStatus} has additional information like
+ * metrics, etc. that is gathered from the {@link FragmentContext}.
+ *
+ * @param state the state to include in the status
+ * @return the status
+ */
+ FragmentStatus getStatus(final FragmentState state) {
+ return getStatus(state, null);
+ }
+
+ private FragmentStatus getStatus(final FragmentState state, final UserException ex) {
+ final FragmentStatus.Builder status = FragmentStatus.newBuilder();
+ final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+ context.getStats().addMetricsToStatus(b);
+ b.setState(state);
+ if (ex != null) {
+ final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+ b.setError(ex.getOrCreatePBError(verbose));
+ }
+ status.setHandle(context.getHandle());
+ b.setMemoryUsed(context.getAllocator().getAllocatedMemory());
+ b.setMinorFragmentId(context.getHandle().getMinorFragmentId());
+ status.setProfile(b);
+ return status.build();
+ }
+
+ /**
+ * Reports the state change to the Foreman. The state is wrapped in a {@link FragmentStatus} that has additional
+ * information like metrics, etc. This additional information is gathered from the {@link FragmentContext}.
+ * NOTE: Use {@link #fail} to report state change to {@link FragmentState#FAILED}.
+ *
+ * @param newState the new state
+ */
+ void stateChanged(final FragmentState newState) {
+ final FragmentStatus status = getStatus(newState, null);
+ logger.info("{}: State to report: {}", QueryIdHelper.getQueryIdentifier(context.getHandle()), newState);
+ switch (newState) {
+ case AWAITING_ALLOCATION:
+ case CANCELLATION_REQUESTED:
+ case CANCELLED:
+ case FINISHED:
+ case RUNNING:
+ sendStatus(status);
+ break;
+ case SENDING:
+ // no op.
+ break;
+ case FAILED:
+ // shouldn't get here since fail() should be called.
+ default:
+ throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
+ }
+ }
+
+ private void sendStatus(final FragmentStatus status) {
+ tunnel.sendFragmentStatus(status);
+ }
+
+ /**
+ * {@link FragmentStatus} with the {@link FragmentState#FAILED} state is reported to the Foreman. The
+ * {@link FragmentStatus} has additional information like metrics, etc. that is gathered from the
+ * {@link FragmentContext}.
+ *
+ * @param ex the exception related to the failure
+ */
+ void fail(final UserException ex) {
+ final FragmentStatus status = getStatus(FragmentState.FAILED, ex);
+ sendStatus(status);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 77440c5..3fc757c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -55,8 +55,8 @@ public class NonRootFragmentManager implements FragmentManager {
this.handle = fragment.getHandle();
this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(fragment, this.context);
- final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
- fragment.getForeman()));
+ final FragmentStatusReporter reporter = new FragmentStatusReporter(this.context,
+ context.getController().getTunnel(fragment.getForeman()));
this.runner = new FragmentExecutor(this.context, fragment, reporter);
this.context.setBuffers(buffers);
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
deleted file mode 100644
index 71da12b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-
-/**
- * For all non root fragments, status will be reported back to the foreman through a control tunnel.
- */
-public class NonRootStatusReporter extends AbstractStatusReporter{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootStatusReporter.class);
-
- private final ControlTunnel tunnel;
-
- public NonRootStatusReporter(FragmentContext context, ControlTunnel tunnel) {
- super(context);
- this.tunnel = tunnel;
- }
-
- @Override
- protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- logger.debug("Sending status change message message to remote node: " + status);
- tunnel.sendFragmentStatus(status);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
deleted file mode 100644
index 424e7fb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.fragment;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-
-/**
- * The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman.
- */
-public interface StatusReporter {
- void fail(FragmentHandle handle, UserException excep);
- void stateChanged(FragmentHandle handle, FragmentState newState);
-}