You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/19 03:03:36 UTC

[1/7] drill git commit: DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split.

Repository: drill
Updated Branches:
  refs/heads/master 238399de5 -> 9ec257efb


DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf155464
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf155464
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf155464

Branch: refs/heads/master
Commit: cf1554641c07359e91a2836d245b0b07438d0cd5
Parents: 238399d
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Apr 16 09:08:00 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 09:11:11 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/store/hive/HiveScan.java  | 98 ++++++++++----------
 1 file changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cf155464/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index b96fda4..92635a8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,10 @@ public class HiveScan extends AbstractGroupScan {
   private long rowCount = 0;
 
   @JsonCreator
-  public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
-                  @JsonProperty("storage-plugin") String storagePluginName,
-                  @JsonProperty("columns") List<SchemaPath> columns,
-                  @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+  public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+                  @JsonProperty("storage-plugin") final String storagePluginName,
+                  @JsonProperty("columns") final List<SchemaPath> columns,
+                  @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this.hiveReadEntry = hiveReadEntry;
     this.storagePluginName = storagePluginName;
     this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +106,7 @@ public class HiveScan extends AbstractGroupScan {
     endpoints = storagePlugin.getContext().getBits();
   }
 
-  public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath> columns) throws ExecutionSetupException {
+  public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
@@ -115,7 +115,7 @@ public class HiveScan extends AbstractGroupScan {
     this.storagePluginName = storagePlugin.getName();
   }
 
-  private HiveScan(HiveScan that) {
+  private HiveScan(final HiveScan that) {
     this.columns = that.columns;
     this.endpoints = that.endpoints;
     this.hiveReadEntry = that.hiveReadEntry;
@@ -133,14 +133,14 @@ public class HiveScan extends AbstractGroupScan {
 
   private void getSplits() throws ExecutionSetupException {
     try {
-      List<Partition> partitions = hiveReadEntry.getPartitions();
-      Table table = hiveReadEntry.getTable();
+      final List<Partition> partitions = hiveReadEntry.getPartitions();
+      final Table table = hiveReadEntry.getTable();
       if (partitions == null || partitions.size() == 0) {
-        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        final Properties properties = MetaStoreUtils.getTableMetadata(table);
         splitInput(properties, table.getSd(), null);
       } else {
-        for (Partition partition : partitions) {
-          Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+        for (final Partition partition : partitions) {
+          final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
           splitInput(properties, partition.getSd(), partition);
         }
       }
@@ -150,27 +150,27 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   /* Split the input given in StorageDescriptor */
-  private void splitInput(Properties properties, StorageDescriptor sd, Partition partition)
+  private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition)
       throws ReflectiveOperationException, IOException {
-    JobConf job = new JobConf();
-    for (Object obj : properties.keySet()) {
+    final JobConf job = new JobConf();
+    for (final Object obj : properties.keySet()) {
       job.set((String) obj, (String) properties.get(obj));
     }
-    for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
+    for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
       job.set(entry.getKey(), entry.getValue());
     }
     InputFormat<?, ?> format = (InputFormat<?, ?>)
         Class.forName(sd.getInputFormat()).getConstructor().newInstance();
     job.setInputFormat(format.getClass());
-    Path path = new Path(sd.getLocation());
-    FileSystem fs = path.getFileSystem(job);
+    final Path path = new Path(sd.getLocation());
+    final FileSystem fs = path.getFileSystem(job);
 
     // Use new JobConf that has FS configuration
-    JobConf jobWithFsConf = new JobConf(fs.getConf());
+    final JobConf jobWithFsConf = new JobConf(fs.getConf());
     if (fs.exists(path)) {
       FileInputFormat.addInputPath(jobWithFsConf, path);
       format = jobWithFsConf.getInputFormat();
-      for (InputSplit split : format.getSplits(jobWithFsConf, 1)) {
+      for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) {
         inputSplits.add(split);
         partitionMap.put(split, partition);
       }
@@ -178,7 +178,7 @@ public class HiveScan extends AbstractGroupScan {
     final String numRowsProp = properties.getProperty("numRows");
     logger.trace("HiveScan num rows property = {}", numRowsProp);
     if (numRowsProp != null) {
-      final int numRows = Integer.valueOf(numRowsProp);
+      final long numRows = Long.valueOf(numRowsProp);
       // starting from hive-0.13, when no statistics are available, this property is set to -1
       // it's important to note that the value returned by hive may not be up to date
       if (numRows > 0) {
@@ -188,33 +188,33 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   @Override
-  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+  public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
     mappings = Lists.newArrayList();
     for (int i = 0; i < endpoints.size(); i++) {
       mappings.add(new ArrayList<InputSplit>());
     }
-    int count = endpoints.size();
+    final int count = endpoints.size();
     for (int i = 0; i < inputSplits.size(); i++) {
       mappings.get(i % count).add(inputSplits.get(i));
     }
   }
 
-  public static String serializeInputSplit(InputSplit split) throws IOException {
-    ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
+  public static String serializeInputSplit(final InputSplit split) throws IOException {
+    final ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
     split.write(byteArrayOutputStream);
-    String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+    final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
     logger.debug("Encoded split string for split {} : {}", split, encoded);
     return encoded;
   }
 
   @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+  public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
-      List<InputSplit> splits = mappings.get(minorFragmentId);
+      final List<InputSplit> splits = mappings.get(minorFragmentId);
       List<HivePartition> parts = Lists.newArrayList();
-      List<String> encodedInputSplits = Lists.newArrayList();
-      List<String> splitTypes = Lists.newArrayList();
-      for (InputSplit split : splits) {
+      final List<String> encodedInputSplits = Lists.newArrayList();
+      final List<String> splitTypes = Lists.newArrayList();
+      for (final InputSplit split : splits) {
         HivePartition partition = null;
         if (partitionMap.get(split) != null) {
           partition = new HivePartition(partitionMap.get(split));
@@ -226,7 +226,7 @@ public class HiveScan extends AbstractGroupScan {
       if (parts.contains(null)) {
         parts = null;
       }
-      HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
+      final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
       return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
@@ -240,22 +240,22 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
-    for (DrillbitEndpoint endpoint : endpoints) {
+    final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
+    for (final DrillbitEndpoint endpoint : endpoints) {
       endpointMap.put(endpoint.getAddress(), endpoint);
       logger.debug("endpoing address: {}", endpoint.getAddress());
     }
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
+    final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
       long totalSize = 0;
-      for (InputSplit split : inputSplits) {
+      for (final InputSplit split : inputSplits) {
         totalSize += Math.max(1, split.getLength());
       }
-      for (InputSplit split : inputSplits) {
-        float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
-        for (String loc : split.getLocations()) {
+      for (final InputSplit split : inputSplits) {
+        final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+        for (final String loc : split.getLocations()) {
           logger.debug("split location: {}", loc);
-          DrillbitEndpoint endpoint = endpointMap.get(loc);
+          final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
             if (affinityMap.containsKey(endpoint)) {
               affinityMap.get(endpoint).addAffinity(affinity);
@@ -265,13 +265,13 @@ public class HiveScan extends AbstractGroupScan {
           }
         }
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    for (DrillbitEndpoint ep : affinityMap.keySet()) {
+    for (final DrillbitEndpoint ep : affinityMap.keySet()) {
       Preconditions.checkNotNull(ep);
     }
-    for (EndpointAffinity a : affinityMap.values()) {
+    for (final EndpointAffinity a : affinityMap.values()) {
       Preconditions.checkNotNull(a.getEndpoint());
     }
     return Lists.newArrayList(affinityMap.values());
@@ -281,7 +281,7 @@ public class HiveScan extends AbstractGroupScan {
   public ScanStats getScanStats() {
     try {
       long data =0;
-      for (InputSplit split : inputSplits) {
+      for (final InputSplit split : inputSplits) {
           data += split.getLength();
       }
 
@@ -292,13 +292,13 @@ public class HiveScan extends AbstractGroupScan {
       }
       logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
       return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
     return new HiveScan(this);
   }
 
@@ -316,20 +316,20 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   @Override
-  public GroupScan clone(List<SchemaPath> columns) {
-    HiveScan newScan = new HiveScan(this);
+  public GroupScan clone(final List<SchemaPath> columns) {
+    final HiveScan newScan = new HiveScan(this);
     newScan.columns = columns;
     return newScan;
   }
 
   @Override
-  public boolean canPushdownProjects(List<SchemaPath> columns) {
+  public boolean canPushdownProjects(final List<SchemaPath> columns) {
     return true;
   }
 
   // Return true if the current table is partitioned false otherwise
   public boolean supportsPartitionFilterPushdown() {
-    List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+    final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
     if (partitionKeys == null || partitionKeys.size() == 0) {
       return false;
     }


[3/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 84071c3..b1c3fe0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -36,9 +36,9 @@ public class RootFragmentManager implements FragmentManager{
   private final FragmentExecutor runner;
   private final FragmentHandle handle;
   private volatile boolean cancel = false;
-  private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+  private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
 
-  public RootFragmentManager(FragmentHandle handle, IncomingBuffers buffers, FragmentExecutor runner) {
+  public RootFragmentManager(final FragmentHandle handle, final IncomingBuffers buffers, final FragmentExecutor runner) {
     super();
     this.handle = handle;
     this.buffers = buffers;
@@ -46,11 +46,16 @@ public class RootFragmentManager implements FragmentManager{
   }
 
   @Override
-  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException {
+  public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
     return buffers.batchArrived(batch);
   }
 
   @Override
+  public void receivingFragmentFinished(final FragmentHandle handle) {
+    throw new IllegalStateException("The root fragment should not be sending any messages to receiver.");
+  }
+
+  @Override
   public FragmentExecutor getRunnable() {
     return runner;
   }
@@ -75,13 +80,13 @@ public class RootFragmentManager implements FragmentManager{
   }
 
   @Override
-  public void addConnection(RemoteConnection connection) {
+  public void addConnection(final RemoteConnection connection) {
     connections.add(connection);
   }
 
   @Override
-  public void setAutoRead(boolean autoRead) {
-    for (RemoteConnection c : connections) {
+  public void setAutoRead(final boolean autoRead) {
+    for (final RemoteConnection c : connections) {
       c.setAutoRead(autoRead);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
index 7155d43..0e888c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
@@ -17,26 +17,26 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 
-public class StateTransitionException extends DrillException {
+public class StateTransitionException extends DrillRuntimeException {
   public StateTransitionException() {
     super();
   }
 
-  public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+  public StateTransitionException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
     super(message, cause, enableSuppression, writableStackTrace);
   }
 
-  public StateTransitionException(String message, Throwable cause) {
+  public StateTransitionException(final String message, final Throwable cause) {
     super(message, cause);
   }
 
-  public StateTransitionException(String message) {
+  public StateTransitionException(final String message) {
     super(message);
   }
 
-  public StateTransitionException(Throwable cause) {
+  public StateTransitionException(final Throwable cause) {
     super(cause);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
index 2bba345..424e7fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
@@ -25,6 +25,6 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState;
  * The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman.
  */
 public interface StatusReporter {
-  void fail(FragmentHandle handle, String message, UserException excep);
+  void fail(FragmentHandle handle, UserException excep);
   void stateChanged(FragmentHandle handle, FragmentState newState);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 933417e..2536bbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.physical.impl;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.DeferredException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContext.ExecutorState;
 import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
@@ -40,16 +42,42 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   private final RecordBatch incoming;
   private final ScreenRoot screenRoot;
 
-  public SimpleRootExec(RootExec e) {
+  public SimpleRootExec(final RootExec e) {
     if (e instanceof ScreenRoot) {
       incoming = ((ScreenRoot)e).getIncoming();
       screenRoot = (ScreenRoot) e;
     } else {
       throw new UnsupportedOperationException();
     }
+    incoming.getContext().setExecutorState(new DummyExecutorState());
+  }
+
+  private class DummyExecutorState implements ExecutorState {
+    final DeferredException ex = new DeferredException();
+
+    @Override
+    public boolean shouldContinue() {
+      return !isFailed();
+    }
+
+    @Override
+    public void fail(final Throwable t) {
+      ex.addThrowable(t);
+    }
+
+    @Override
+    public boolean isFailed() {
+      return ex.getException() != null;
+    }
+
+    @Override
+    public Throwable getFailureCause() {
+      return ex.getException();
+    }
 
   }
 
+
   public FragmentContext getContext() {
     return incoming.getContext();
   }
@@ -63,8 +91,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @SuppressWarnings("unchecked")
-  public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass) {
-    TypedFieldId tfid = incoming.getValueVectorId(path);
+  public <T extends ValueVector> T getValueVectorById(final SchemaPath path, final Class<?> vvClass) {
+    final TypedFieldId tfid = incoming.getValueVectorId(path);
     return (T) incoming.getValueAccessorById(vvClass, tfid.getFieldIds()).getValueVector();
   }
 
@@ -86,14 +114,14 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @Override
-  public void receivingFragmentFinished(FragmentHandle handle) {
+  public void receivingFragmentFinished(final FragmentHandle handle) {
     //no op
   }
 
   @Override
   public Iterator<ValueVector> iterator() {
-    List<ValueVector> vv = Lists.newArrayList();
-    for (VectorWrapper<?> vw : incoming) {
+    final List<ValueVector> vv = Lists.newArrayList();
+    for (final VectorWrapper<?> vw : incoming) {
       vv.add(vw.getValueVector());
     }
     return vv.iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index b2e5740..b8336e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -27,10 +27,7 @@ import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.AckSender;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -84,6 +81,7 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     context = Mockito.mock(FragmentContext.class);
 
     Mockito.when(context.getConfig()).thenReturn(dc);
+    Mockito.when(context.shouldContinue()).thenReturn(true);
 
     rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index d9dba6e..eca19a0 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -2081,6 +2081,10 @@ public final class SchemaUserBitShared
                 if(message.hasEndpoint())
                     output.writeObject(9, message.getEndpoint(), org.apache.drill.exec.proto.SchemaCoordinationProtos.DrillbitEndpoint.WRITE, false);
 
+                if(message.hasLastUpdate())
+                    output.writeInt64(10, message.getLastUpdate(), false);
+                if(message.hasLastProgress())
+                    output.writeInt64(11, message.getLastProgress(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile message)
             {
@@ -2150,6 +2154,12 @@ public final class SchemaUserBitShared
                             builder.setEndpoint(input.mergeObject(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.newBuilder(), org.apache.drill.exec.proto.SchemaCoordinationProtos.DrillbitEndpoint.MERGE));
 
                             break;
+                        case 10:
+                            builder.setLastUpdate(input.readInt64());
+                            break;
+                        case 11:
+                            builder.setLastProgress(input.readInt64());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -2199,6 +2209,8 @@ public final class SchemaUserBitShared
                 case 7: return "memoryUsed";
                 case 8: return "maxMemoryUsed";
                 case 9: return "endpoint";
+                case 10: return "lastUpdate";
+                case 11: return "lastProgress";
                 default: return null;
             }
         }
@@ -2219,6 +2231,8 @@ public final class SchemaUserBitShared
             fieldMap.put("memoryUsed", 7);
             fieldMap.put("maxMemoryUsed", 8);
             fieldMap.put("endpoint", 9);
+            fieldMap.put("lastUpdate", 10);
+            fieldMap.put("lastProgress", 11);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index acd8624..a229450 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -236,6 +236,10 @@ public final class UserBitShared {
      * <code>FAILED = 5;</code>
      */
     FAILED(5, 5),
+    /**
+     * <code>CANCELLATION_REQUESTED = 6;</code>
+     */
+    CANCELLATION_REQUESTED(6, 6),
     ;
 
     /**
@@ -262,6 +266,10 @@ public final class UserBitShared {
      * <code>FAILED = 5;</code>
      */
     public static final int FAILED_VALUE = 5;
+    /**
+     * <code>CANCELLATION_REQUESTED = 6;</code>
+     */
+    public static final int CANCELLATION_REQUESTED_VALUE = 6;
 
 
     public final int getNumber() { return value; }
@@ -274,6 +282,7 @@ public final class UserBitShared {
         case 3: return FINISHED;
         case 4: return CANCELLED;
         case 5: return FAILED;
+        case 6: return CANCELLATION_REQUESTED;
         default: return null;
       }
     }
@@ -15705,6 +15714,26 @@ public final class UserBitShared {
      * <code>optional .exec.DrillbitEndpoint endpoint = 9;</code>
      */
     org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpointOrBuilder getEndpointOrBuilder();
+
+    // optional int64 last_update = 10;
+    /**
+     * <code>optional int64 last_update = 10;</code>
+     */
+    boolean hasLastUpdate();
+    /**
+     * <code>optional int64 last_update = 10;</code>
+     */
+    long getLastUpdate();
+
+    // optional int64 last_progress = 11;
+    /**
+     * <code>optional int64 last_progress = 11;</code>
+     */
+    boolean hasLastProgress();
+    /**
+     * <code>optional int64 last_progress = 11;</code>
+     */
+    long getLastProgress();
   }
   /**
    * Protobuf type {@code exec.shared.MinorFragmentProfile}
@@ -15827,6 +15856,16 @@ public final class UserBitShared {
               bitField0_ |= 0x00000080;
               break;
             }
+            case 80: {
+              bitField0_ |= 0x00000100;
+              lastUpdate_ = input.readInt64();
+              break;
+            }
+            case 88: {
+              bitField0_ |= 0x00000200;
+              lastProgress_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -16046,6 +16085,38 @@ public final class UserBitShared {
       return endpoint_;
     }
 
+    // optional int64 last_update = 10;
+    public static final int LAST_UPDATE_FIELD_NUMBER = 10;
+    private long lastUpdate_;
+    /**
+     * <code>optional int64 last_update = 10;</code>
+     */
+    public boolean hasLastUpdate() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional int64 last_update = 10;</code>
+     */
+    public long getLastUpdate() {
+      return lastUpdate_;
+    }
+
+    // optional int64 last_progress = 11;
+    public static final int LAST_PROGRESS_FIELD_NUMBER = 11;
+    private long lastProgress_;
+    /**
+     * <code>optional int64 last_progress = 11;</code>
+     */
+    public boolean hasLastProgress() {
+      return ((bitField0_ & 0x00000200) == 0x00000200);
+    }
+    /**
+     * <code>optional int64 last_progress = 11;</code>
+     */
+    public long getLastProgress() {
+      return lastProgress_;
+    }
+
     private void initFields() {
       state_ = org.apache.drill.exec.proto.UserBitShared.FragmentState.SENDING;
       error_ = org.apache.drill.exec.proto.UserBitShared.DrillPBError.getDefaultInstance();
@@ -16056,6 +16127,8 @@ public final class UserBitShared {
       memoryUsed_ = 0L;
       maxMemoryUsed_ = 0L;
       endpoint_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.getDefaultInstance();
+      lastUpdate_ = 0L;
+      lastProgress_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16096,6 +16169,12 @@ public final class UserBitShared {
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         output.writeMessage(9, endpoint_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeInt64(10, lastUpdate_);
+      }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        output.writeInt64(11, lastProgress_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -16141,6 +16220,14 @@ public final class UserBitShared {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(9, endpoint_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(10, lastUpdate_);
+      }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(11, lastProgress_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -16290,6 +16377,10 @@ public final class UserBitShared {
           endpointBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000100);
+        lastUpdate_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000200);
+        lastProgress_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
 
@@ -16367,6 +16458,14 @@ public final class UserBitShared {
         } else {
           result.endpoint_ = endpointBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.lastUpdate_ = lastUpdate_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000200;
+        }
+        result.lastProgress_ = lastProgress_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -16433,6 +16532,12 @@ public final class UserBitShared {
         if (other.hasEndpoint()) {
           mergeEndpoint(other.getEndpoint());
         }
+        if (other.hasLastUpdate()) {
+          setLastUpdate(other.getLastUpdate());
+        }
+        if (other.hasLastProgress()) {
+          setLastProgress(other.getLastProgress());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -17135,6 +17240,72 @@ public final class UserBitShared {
         return endpointBuilder_;
       }
 
+      // optional int64 last_update = 10;
+      private long lastUpdate_ ;
+      /**
+       * <code>optional int64 last_update = 10;</code>
+       */
+      public boolean hasLastUpdate() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional int64 last_update = 10;</code>
+       */
+      public long getLastUpdate() {
+        return lastUpdate_;
+      }
+      /**
+       * <code>optional int64 last_update = 10;</code>
+       */
+      public Builder setLastUpdate(long value) {
+        bitField0_ |= 0x00000200;
+        lastUpdate_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 last_update = 10;</code>
+       */
+      public Builder clearLastUpdate() {
+        bitField0_ = (bitField0_ & ~0x00000200);
+        lastUpdate_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 last_progress = 11;
+      private long lastProgress_ ;
+      /**
+       * <code>optional int64 last_progress = 11;</code>
+       */
+      public boolean hasLastProgress() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <code>optional int64 last_progress = 11;</code>
+       */
+      public long getLastProgress() {
+        return lastProgress_;
+      }
+      /**
+       * <code>optional int64 last_progress = 11;</code>
+       */
+      public Builder setLastProgress(long value) {
+        bitField0_ |= 0x00000400;
+        lastProgress_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 last_progress = 11;</code>
+       */
+      public Builder clearLastProgress() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        lastProgress_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.shared.MinorFragmentProfile)
     }
 
@@ -19954,7 +20125,7 @@ public final class UserBitShared {
       "jorFragmentProfile\"t\n\024MajorFragmentProfi" +
       "le\022\031\n\021major_fragment_id\030\001 \001(\005\022A\n\026minor_f" +
       "ragment_profile\030\002 \003(\0132!.exec.shared.Mino" +
-      "rFragmentProfile\"\274\002\n\024MinorFragmentProfil" +
+      "rFragmentProfile\"\350\002\n\024MinorFragmentProfil" +
       "e\022)\n\005state\030\001 \001(\0162\032.exec.shared.FragmentS" +
       "tate\022(\n\005error\030\002 \001(\0132\031.exec.shared.DrillP" +
       "BError\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020ope" +
@@ -19962,42 +20133,44 @@ public final class UserBitShared {
       "orProfile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_tim" +
       "e\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memo",
       "ry_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.D" +
-      "rillbitEndpoint\"\377\001\n\017OperatorProfile\0221\n\ri" +
-      "nput_profile\030\001 \003(\0132\032.exec.shared.StreamP" +
-      "rofile\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_" +
-      "type\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rproce" +
-      "ss_nanos\030\006 \001(\003\022#\n\033peak_local_memory_allo" +
-      "cated\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.share" +
-      "d.MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStr" +
-      "eamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002" +
-      " \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\t",
-      "metric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014d" +
-      "ouble_value\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_C" +
-      "ONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQuer" +
-      "yType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020" +
-      "\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
-      "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
-      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\335\005\n\020CoreO" +
-      "peratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADC" +
-      "AST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGA" +
-      "TE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025H",
-      "ASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MER" +
-      "GING_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SEN" +
-      "DER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER" +
-      "\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELE" +
-      "CTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGR" +
-      "EGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT" +
-      "\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032" +
-      "\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_S" +
-      "CAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB" +
-      "_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SU",
-      "B_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SC" +
-      "AN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_S" +
-      "UB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUC" +
-      "ER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIN" +
-      "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_" +
-      "SCAN\020$B.\n\033org.apache.drill.exec.protoB\rU" +
-      "serBitSharedH\001"
+      "rillbitEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\r" +
+      "last_progress\030\013 \001(\003\"\377\001\n\017OperatorProfile\022" +
+      "1\n\rinput_profile\030\001 \003(\0132\032.exec.shared.Str" +
+      "eamProfile\022\023\n\013operator_id\030\003 \001(\005\022\025\n\ropera" +
+      "tor_type\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rp" +
+      "rocess_nanos\030\006 \001(\003\022#\n\033peak_local_memory_" +
+      "allocated\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.s" +
+      "hared.MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n" +
+      "\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batch",
+      "es\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue" +
+      "\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022" +
+      "\024\n\014double_value\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013B" +
+      "IT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\t" +
+      "QueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSI" +
+      "CAL\020\003*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023" +
+      "AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FI" +
+      "NISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026C" +
+      "ANCELLATION_REQUESTED\020\006*\335\005\n\020CoreOperator" +
+      "Type\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SEN",
+      "DER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n" +
+      "\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PAR" +
+      "TITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RE" +
+      "CEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013" +
+      "\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014R" +
+      "ANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_V" +
+      "ECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017" +
+      "\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005T" +
+      "RACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQU" +
+      "ET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025",
+      "\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030" +
+      "\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020" +
+      "\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n" +
+      "\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN" +
+      "\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONS" +
+      "UMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024" +
+      "\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$B" +
+      ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+      "haredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -20099,7 +20272,7 @@ public final class UserBitShared {
           internal_static_exec_shared_MinorFragmentProfile_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_shared_MinorFragmentProfile_descriptor,
-              new java.lang.String[] { "State", "Error", "MinorFragmentId", "OperatorProfile", "StartTime", "EndTime", "MemoryUsed", "MaxMemoryUsed", "Endpoint", });
+              new java.lang.String[] { "State", "Error", "MinorFragmentId", "OperatorProfile", "StartTime", "EndTime", "MemoryUsed", "MaxMemoryUsed", "Endpoint", "LastUpdate", "LastProgress", });
           internal_static_exec_shared_OperatorProfile_descriptor =
             getDescriptor().getMessageTypes().get(16);
           internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
index ba536fc..4d3779b 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
@@ -27,7 +27,8 @@ public enum FragmentState implements com.dyuproject.protostuff.EnumLite<Fragment
     RUNNING(2),
     FINISHED(3),
     CANCELLED(4),
-    FAILED(5);
+    FAILED(5),
+    CANCELLATION_REQUESTED(6);
     
     public final int number;
     
@@ -51,6 +52,7 @@ public enum FragmentState implements com.dyuproject.protostuff.EnumLite<Fragment
             case 3: return FINISHED;
             case 4: return CANCELLED;
             case 5: return FAILED;
+            case 6: return CANCELLATION_REQUESTED;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
index 5cd71f9..fb381a8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
@@ -58,6 +58,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
     private long memoryUsed;
     private long maxMemoryUsed;
     private DrillbitEndpoint endpoint;
+    private long lastUpdate;
+    private long lastProgress;
 
     public MinorFragmentProfile()
     {
@@ -183,6 +185,32 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
         return this;
     }
 
+    // lastUpdate
+
+    public long getLastUpdate()
+    {
+        return lastUpdate;
+    }
+
+    public MinorFragmentProfile setLastUpdate(long lastUpdate)
+    {
+        this.lastUpdate = lastUpdate;
+        return this;
+    }
+
+    // lastProgress
+
+    public long getLastProgress()
+    {
+        return lastProgress;
+    }
+
+    public MinorFragmentProfile setLastProgress(long lastProgress)
+    {
+        this.lastProgress = lastProgress;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -269,6 +297,12 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
                     message.endpoint = input.mergeObject(message.endpoint, DrillbitEndpoint.getSchema());
                     break;
 
+                case 10:
+                    message.lastUpdate = input.readInt64();
+                    break;
+                case 11:
+                    message.lastProgress = input.readInt64();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -313,6 +347,12 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
         if(message.endpoint != null)
              output.writeObject(9, message.endpoint, DrillbitEndpoint.getSchema(), false);
 
+
+        if(message.lastUpdate != 0)
+            output.writeInt64(10, message.lastUpdate, false);
+
+        if(message.lastProgress != 0)
+            output.writeInt64(11, message.lastProgress, false);
     }
 
     public String getFieldName(int number)
@@ -328,6 +368,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
             case 7: return "memoryUsed";
             case 8: return "maxMemoryUsed";
             case 9: return "endpoint";
+            case 10: return "lastUpdate";
+            case 11: return "lastProgress";
             default: return null;
         }
     }
@@ -350,6 +392,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
         __fieldMap.put("memoryUsed", 7);
         __fieldMap.put("maxMemoryUsed", 8);
         __fieldMap.put("endpoint", 9);
+        __fieldMap.put("lastUpdate", 10);
+        __fieldMap.put("lastProgress", 11);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 7383bd2..a17dbc7 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -208,6 +208,8 @@ message MinorFragmentProfile {
   optional int64 memory_used = 7;
   optional int64 max_memory_used = 8;
   optional DrillbitEndpoint endpoint = 9;
+  optional int64 last_update = 10;
+  optional int64 last_progress = 11;
 }
 
 message OperatorProfile {
@@ -240,6 +242,7 @@ enum FragmentState {
   FINISHED = 3;
   CANCELLED = 4;
   FAILED = 5;
+  CANCELLATION_REQUESTED = 6;
 }
 
 enum CoreOperatorType {


[5/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4b317e0..7b9fffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,9 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.List;
 
@@ -63,18 +61,15 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.sun.codemodel.JExpr;
 
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -85,7 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
-  private boolean buildingSchema = true;
+  private final boolean buildingSchema = true;
 
   private static final String EMPTY_STRING = "";
   private boolean first = true;
@@ -96,7 +91,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     public String prefix = "";
     public HashMap<String, Integer> prefixMap = Maps.newHashMap();
     public CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
-    private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+    private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
 
     private void clear() {
       isStar = false;
@@ -109,7 +104,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
   }
 
@@ -120,7 +115,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     super.killIncoming(sendUpstream);
     hasRemainder = false;
   }
@@ -157,7 +152,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         if (next == IterOutcome.OK_NEW_SCHEMA) {
           try {
             setupNewSchema();
-          } catch (SchemaChangeException e) {
+          } catch (final SchemaChangeException e) {
             throw new RuntimeException(e);
           }
         }
@@ -172,7 +167,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return IterOutcome.OUT_OF_MEMORY;
     }
 
-    int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
+    final int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
     if (outputRecords < incomingRecordCount) {
       setValueCount(outputRecords);
       hasRemainder = true;
@@ -180,7 +175,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       this.recordCount = remainderIndex;
     } else {
       setValueCount(incomingRecordCount);
-      for(VectorWrapper<?> v: incoming) {
+      for(final VectorWrapper<?> v: incoming) {
         v.clear();
       }
       this.recordCount = outputRecords;
@@ -195,12 +190,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   }
 
   private void handleRemainder() {
-    int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
+    final int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
     if (!doAlloc()) {
       outOfMemory = true;
       return;
     }
-    int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
+    final int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);
       this.recordCount = projRecords;
@@ -209,7 +204,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       setValueCount(remainingRecordCount);
       hasRemainder = false;
       remainderIndex = 0;
-      for (VectorWrapper<?> v : incoming) {
+      for (final VectorWrapper<?> v : incoming) {
         v.clear();
       }
       this.recordCount = remainingRecordCount;
@@ -221,13 +216,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  public void addComplexWriter(ComplexWriter writer) {
+  public void addComplexWriter(final ComplexWriter writer) {
     complexWriters.add(writer);
   }
 
   private boolean doAlloc() {
     //Allocate vv in the allocationVectors.
-    for (ValueVector v : this.allocationVectors) {
+    for (final ValueVector v : this.allocationVectors) {
       AllocationHelper.allocateNew(v, incoming.getRecordCount());
     }
 
@@ -236,16 +231,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return true;
     }
 
-    for (ComplexWriter writer : complexWriters) {
+    for (final ComplexWriter writer : complexWriters) {
       writer.allocate();
     }
 
     return true;
   }
 
-  private void setValueCount(int count) {
-    for (ValueVector v : allocationVectors) {
-      ValueVector.Mutator m = v.getMutator();
+  private void setValueCount(final int count) {
+    for (final ValueVector v : allocationVectors) {
+      final ValueVector.Mutator m = v.getMutator();
       m.setValueCount(count);
     }
 
@@ -253,15 +248,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return;
     }
 
-    for (ComplexWriter writer : complexWriters) {
+    for (final ComplexWriter writer : complexWriters) {
       writer.setValueCount(count);
     }
   }
 
   /** hack to make ref and full work together... need to figure out if this is still necessary. **/
-  private FieldReference getRef(NamedExpression e) {
-    FieldReference ref = e.getRef();
-    PathSegment seg = ref.getRootSegment();
+  private FieldReference getRef(final NamedExpression e) {
+    final FieldReference ref = e.getRef();
+    final PathSegment seg = ref.getRootSegment();
 
 //    if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
 //      return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
@@ -269,8 +264,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return ref;
   }
 
-  private boolean isAnyWildcard(List<NamedExpression> exprs) {
-    for (NamedExpression e : exprs) {
+  private boolean isAnyWildcard(final List<NamedExpression> exprs) {
+    for (final NamedExpression e : exprs) {
       if (isWildcard(e)) {
         return true;
       }
@@ -278,18 +273,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return false;
   }
 
-  private boolean isWildcard(NamedExpression ex) {
+  private boolean isWildcard(final NamedExpression ex) {
     if ( !(ex.getExpr() instanceof SchemaPath)) {
       return false;
     }
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
     return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
   }
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     if (allocationVectors != null) {
-      for (ValueVector v : allocationVectors) {
+      for (final ValueVector v : allocationVectors) {
         v.clear();
       }
     }
@@ -305,12 +300,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
 
-    IntOpenHashSet transferFieldIds = new IntOpenHashSet();
+    final IntOpenHashSet transferFieldIds = new IntOpenHashSet();
 
-    boolean isAnyWildcard = isAnyWildcard(exprs);
+    final boolean isAnyWildcard = isAnyWildcard(exprs);
 
-    ClassifierResult result = new ClassifierResult();
-    boolean classify = isClassificationNeeded(exprs);
+    final ClassifierResult result = new ClassifierResult();
+    final boolean classify = isClassificationNeeded(exprs);
 
     for (int i = 0; i < exprs.size(); i++) {
       final NamedExpression namedExpression = exprs.get(i);
@@ -321,33 +316,33 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
         if (result.isStar) {
           // The value indicates which wildcard we are processing now
-          Integer value = result.prefixMap.get(result.prefix);
+          final Integer value = result.prefixMap.get(result.prefix);
           if (value != null && value.intValue() == 1) {
             int k = 0;
-            for (VectorWrapper<?> wrapper : incoming) {
-              ValueVector vvIn = wrapper.getValueVector();
-              SchemaPath originalPath = vvIn.getField().getPath();
+            for (final VectorWrapper<?> wrapper : incoming) {
+              final ValueVector vvIn = wrapper.getValueVector();
+              final SchemaPath originalPath = vvIn.getField().getPath();
               if (k > result.outputNames.size()-1) {
                 assert false;
               }
-              String name = result.outputNames.get(k++);  // get the renamed column names
+              final String name = result.outputNames.get(k++);  // get the renamed column names
               if (name == EMPTY_STRING) {
                 continue;
               }
-              FieldReference ref = new FieldReference(name);
-              ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
-              TransferPair tp = vvIn.makeTransferPair(vvOut);
+              final FieldReference ref = new FieldReference(name);
+              final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+              final TransferPair tp = vvIn.makeTransferPair(vvOut);
               transfers.add(tp);
             }
           } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
             int k = 0;
-            for (VectorWrapper<?> wrapper : incoming) {
-              ValueVector vvIn = wrapper.getValueVector();
-              SchemaPath originalPath = vvIn.getField().getPath();
+            for (final VectorWrapper<?> wrapper : incoming) {
+              final ValueVector vvIn = wrapper.getValueVector();
+              final SchemaPath originalPath = vvIn.getField().getPath();
               if (k > result.outputNames.size()-1) {
                 assert false;
               }
-              String name = result.outputNames.get(k++);  // get the renamed column names
+              final String name = result.outputNames.get(k++);  // get the renamed column names
               if (name == EMPTY_STRING) {
                 continue;
               }
@@ -357,12 +352,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
 
-              MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
-              ValueVector vv = container.addOrGet(outputField, callBack);
+              final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
+              final ValueVector vv = container.addOrGet(outputField, callBack);
               allocationVectors.add(vv);
-              TypedFieldId fid = container.getValueVectorId(outputField.getPath());
-              ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
-              HoldingContainer hc = cg.addExpr(write);
+              final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+              final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+              final HoldingContainer hc = cg.addExpr(write);
             }
           }
           continue;
@@ -371,7 +366,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         // For the columns which do not needed to be classified,
         // it is still necessary to ensure the output column name is unique
         result.outputNames = Lists.newArrayList();
-        String outputName = getRef(namedExpression).getRootSegment().getPath();
+        final String outputName = getRef(namedExpression).getRootSegment().getPath();
         addToResultMaps(outputName, result, true);
       }
 
@@ -403,17 +398,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           && !isAnyWildcard
           && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
 
-        ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
-        TypedFieldId id = vectorRead.getFieldId();
-        ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+        final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+        final TypedFieldId id = vectorRead.getFieldId();
+        final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
         Preconditions.checkNotNull(incoming);
 
-        FieldReference ref = getRef(namedExpression);
-        ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
-        TransferPair tp = vvIn.makeTransferPair(vvOut);
+        final FieldReference ref = getRef(namedExpression);
+        final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
+        final TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
-        logger.debug("Added transfer for project expression.");
       } else if (expr instanceof DrillFuncHolderExpr &&
           ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder())  {
         // Need to process ComplexWriter function evaluation.
@@ -429,12 +423,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         cg.addExpr(expr);
       } else{
         // need to do evaluation.
-        ValueVector vector = container.addOrGet(outputField, callBack);
+        final ValueVector vector = container.addOrGet(outputField, callBack);
         allocationVectors.add(vector);
-        TypedFieldId fid = container.getValueVectorId(outputField.getPath());
-        boolean useSetSafe = !(vector instanceof FixedWidthVector);
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
-        HoldingContainer hc = cg.addExpr(write);
+        final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
+        final HoldingContainer hc = cg.addExpr(write);
 
         logger.debug("Added eval for project expression.");
       }
@@ -459,12 +453,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return popConfig.getExprs();
     }
 
-    List<NamedExpression> exprs = Lists.newArrayList();
-    for (MaterializedField field : incoming.getSchema()) {
+    final List<NamedExpression> exprs = Lists.newArrayList();
+    for (final MaterializedField field : incoming.getSchema()) {
       if (Types.isComplex(field.getType()) || Types.isRepeated(field.getType())) {
-        LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
-        String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
-        List<LogicalExpression> castArgs = Lists.newArrayList();
+        final LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
+        final String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
+        final List<LogicalExpression> castArgs = Lists.newArrayList();
         castArgs.add(convertToJson);  //input_expr
         /*
          * We are implicitly casting to VARCHAR so we don't have a max length,
@@ -472,7 +466,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
          * to the actual size so this size doesn't really matter.
          */
         castArgs.add(new ValueExpressions.LongExpression(TypeHelper.VARCHAR_DEFAULT_CAST_LEN, null)); //
-        FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
+        final FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
         exprs.add(new NamedExpression(castCall, new FieldReference(field.getPath())));
       } else {
         exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath())));
@@ -481,17 +475,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return exprs;
   }
 
-  private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+  private boolean isClassificationNeeded(final List<NamedExpression> exprs) {
     boolean needed = false;
     for (int i = 0; i < exprs.size(); i++) {
       final NamedExpression ex = exprs.get(i);
       if (!(ex.getExpr() instanceof SchemaPath)) {
         continue;
       }
-      NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
-      NameSegment ref = ex.getRef().getRootSegment();
-      boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-      boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+      final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+      final NameSegment ref = ex.getRef().getRootSegment();
+      final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+      final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
 
       if (refHasPrefix || exprContainsStar) {
         needed = true;
@@ -501,16 +495,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return needed;
   }
 
-  private String getUniqueName(String name, ClassifierResult result) {
-    Integer currentSeq = (Integer) result.sequenceMap.get(name);
+  private String getUniqueName(final String name, final ClassifierResult result) {
+    final Integer currentSeq = (Integer) result.sequenceMap.get(name);
     if (currentSeq == null) { // name is unique, so return the original name
-      Integer n = -1;
+      final Integer n = -1;
       result.sequenceMap.put(name, n);
       return name;
     }
     // create a new name
-    Integer newSeq = currentSeq + 1;
-    String newName = name + newSeq;
+    final Integer newSeq = currentSeq + 1;
+    final String newName = name + newSeq;
     result.sequenceMap.put(name, newSeq);
     result.sequenceMap.put(newName, -1);
 
@@ -527,7 +521,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   *                            to ensure uniqueness
   * @Param allowDupsWithRename if the original name has been used, is renaming allowed to ensure output name unique
   */
-  private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) {
+  private void addToResultMaps(final String origName, final ClassifierResult result, final boolean allowDupsWithRename) {
     String name = origName;
     if (allowDupsWithRename) {
       name = getUniqueName(origName, result);
@@ -540,22 +534,22 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result)  {
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
-    NameSegment ref = ex.getRef().getRootSegment();
-    boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
-    boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
+  private void classifyExpr(final NamedExpression ex, final RecordBatch incoming, final ClassifierResult result)  {
+    final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    final NameSegment ref = ex.getRef().getRootSegment();
+    final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+    final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+    final boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
+    final boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
+    final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+    final boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
 
     String exprPrefix = EMPTY_STRING;
     String exprSuffix = expr.getPath();
 
     if (exprHasPrefix) {
       // get the prefix of the expr
-      String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(exprComponents.length == 2);
       exprPrefix = exprComponents[0];
       exprSuffix = exprComponents[1];
@@ -565,18 +559,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     boolean exprIsFirstWildcard = false;
     if (exprContainsStar) {
       result.isStar = true;
-      Integer value = (Integer) result.prefixMap.get(exprPrefix);
+      final Integer value = (Integer) result.prefixMap.get(exprPrefix);
       if (value == null) {
-        Integer n = 1;
+        final Integer n = 1;
         result.prefixMap.put(exprPrefix, n);
         exprIsFirstWildcard = true;
       } else {
-        Integer n = value + 1;
+        final Integer n = value + 1;
         result.prefixMap.put(exprPrefix, n);
       }
     }
 
-    int incomingSchemaSize = incoming.getSchema().getFieldCount();
+    final int incomingSchemaSize = incoming.getSchema().getFieldCount();
 
     // for debugging..
     // if (incomingSchemaSize > 9) {
@@ -585,16 +579,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     // input is '*' and output is 'prefix_*'
     if (exprIsStar && refHasPrefix && refEndsWithStar) {
-      String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(components.length == 2);
-      String prefix = components[0];
+      final String prefix = components[0];
       result.outputNames = Lists.newArrayList();
-      for(VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getPath().getRootSegment().getPath();
+      for(final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String name = vvIn.getField().getPath().getRootSegment().getPath();
 
         // add the prefix to the incoming column name
-        String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
+        final String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
         addToResultMaps(newName, result, false);
       }
     }
@@ -609,19 +603,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           result.outputNames.add(EMPTY_STRING);  // initialize
         }
 
-        for (VectorWrapper<?> wrapper : incoming) {
-          ValueVector vvIn = wrapper.getValueVector();
-          String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+        for (final VectorWrapper<?> wrapper : incoming) {
+          final ValueVector vvIn = wrapper.getValueVector();
+          final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
           // get the prefix of the name
-          String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+          final String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
           // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it
           if (nameComponents.length <= 1) {
             k++;
             continue;
           }
-          String namePrefix = nameComponents[0];
+          final String namePrefix = nameComponents[0];
           if (exprPrefix.equals(namePrefix)) {
-            String newName = incomingName;
+            final String newName = incomingName;
             if (!result.outputMap.containsKey(newName)) {
               result.outputNames.set(k, newName);
               result.outputMap.put(newName,  newName);
@@ -632,9 +626,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       } else {
         result.outputNames = Lists.newArrayList();
         if (exprContainsStar) {
-          for (VectorWrapper<?> wrapper : incoming) {
-            ValueVector vvIn = wrapper.getValueVector();
-            String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+          for (final VectorWrapper<?> wrapper : incoming) {
+            final ValueVector vvIn = wrapper.getValueVector();
+            final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
             if (refContainsStar) {
               addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
             } else {
@@ -642,7 +636,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             }
           }
         } else {
-          String newName = expr.getPath();
+          final String newName = expr.getPath();
           if (!refHasPrefix && !exprHasPrefix) {
             addToResultMaps(newName, result, true); // allow dups since this is likely top-level project
           } else {
@@ -655,9 +649,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     // input is wildcard and it is not the first wildcard
     else if(exprIsStar) {
       result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
         addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
       }
     }
@@ -665,7 +659,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     // only the output has prefix
     else if (!exprHasPrefix && refHasPrefix) {
       result.outputNames = Lists.newArrayList();
-      String newName = ref.getPath();
+      final String newName = ref.getPath();
       addToResultMaps(newName, result, false);
     }
     // input has prefix but output does not
@@ -676,24 +670,24 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         result.outputNames.add(EMPTY_STRING);  // initialize
       }
 
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getPath().getRootSegment().getPath();
-        String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String name = vvIn.getField().getPath().getRootSegment().getPath();
+        final String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
         if (components.length <= 1)  {
           k++;
           continue;
         }
-        String namePrefix = components[0];
-        String nameSuffix = components[1];
+        final String namePrefix = components[0];
+        final String nameSuffix = components[1];
         if (exprPrefix.equals(namePrefix)) {
           if (refContainsStar) {
             // remove the prefix from the incoming column names
-            String newName = getUniqueName(nameSuffix, result);  // for top level we need to make names unique
+            final String newName = getUniqueName(nameSuffix, result);  // for top level we need to make names unique
             result.outputNames.set(k, newName);
           } else if (exprSuffix.equals(nameSuffix)) {
             // example: ref: $f1, expr: T0<PREFIX><column_name>
-            String newName = ref.getPath();
+            final String newName = ref.getPath();
             result.outputNames.set(k, newName);
           }
         } else {
@@ -704,7 +698,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // input and output have prefixes although they could be different...
     else if (exprHasPrefix && refHasPrefix) {
-      String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(input.length == 2);
       assert false : "Unexpected project expression or reference";  // not handled yet
     }
@@ -713,11 +707,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       // then we just want to pick the ref name as the output column name
 
       result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
         if (expr.getPath().equals(incomingName)) {
-          String newName = ref.getPath();
+          final String newName = ref.getPath();
           addToResultMaps(newName, result, true);
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 389d668..094865e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -27,9 +27,9 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -53,13 +53,13 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
 public class UnorderedReceiverBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
 
-  private RecordBatchLoader batchLoader;
-  private RawFragmentBatchProvider fragProvider;
-  private FragmentContext context;
+  private final RecordBatchLoader batchLoader;
+  private final RawFragmentBatchProvider fragProvider;
+  private final FragmentContext context;
   private BatchSchema schema;
-  private OperatorStats stats;
+  private final OperatorStats stats;
   private boolean first = true;
-  private UnorderedReceiver config;
+  private final UnorderedReceiver config;
   OperatorContext oContext;
 
   public enum Metric implements MetricDef {
@@ -72,7 +72,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
     }
   }
 
-  public UnorderedReceiverBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws OutOfMemoryException {
+  public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
     this.fragProvider = fragProvider;
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
@@ -101,7 +101,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
     }
@@ -124,12 +124,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return batchLoader.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
@@ -154,7 +154,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
       if (batch == null) {
         batchLoader.clear();
-        if (context.isCancelled()) {
+        if (!context.shouldContinue()) {
           return IterOutcome.STOP;
         }
         return IterOutcome.NONE;
@@ -167,8 +167,8 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
 //      logger.debug("Next received batch {}", batch);
 
-      RecordBatchDef rbd = batch.getHeader().getDef();
-      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+      final RecordBatchDef rbd = batch.getHeader().getDef();
+      final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
 
       batch.release();
@@ -206,15 +206,16 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   private void informSenders() {
-    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+    logger.info("Informing senders of request to terminate sending.");
+    final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
-      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+    for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+      final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
               .setMinorFragmentId(providingEndpoint.getId())
               .build();
-      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+      final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
@@ -225,12 +226,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
-    public void failed(RpcException ex) {
+    public void failed(final RpcException ex) {
       logger.warn("Failed to inform upstream that receiver is finished");
     }
 
     @Override
-    public void success(Ack value, ByteBuf buffer) {
+    public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index bd3c4e7..95d062c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -347,6 +347,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
         mSorter.sort(this.container);
 
+        // sort may have prematurely exited due to should continue returning false.
+        if (!context.shouldContinue()) {
+          return IterOutcome.STOP;
+        }
         sv4 = mSorter.getSV4();
 
         long t = watch.elapsed(TimeUnit.MICROSECONDS);

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 94bc3a3..9b97e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -41,19 +41,20 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
   private long compares;
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
   private Queue<Integer> newRunStarts;
-
+  private FragmentContext context;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
+  public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
     this.vector4 = vector4.createNewWrapperCurrent();
+    this.context = context;
     vector4.clear();
     doSetup(context, hyperBatch, null);
     runStarts.add(0);
     int batch = 0;
     for (int i = 0; i < this.vector4.getTotalCount(); i++) {
-      int newBatch = this.vector4.get(i) >>> 16;
+      final int newBatch = this.vector4.get(i) >>> 16;
       if (newBatch == batch) {
         continue;
       } else if(newBatch == batch + 1) {
@@ -63,7 +64,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
         throw new UnsupportedOperationException("Missing batch");
       }
     }
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
+    final BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
     preAlloc.preAllocate(4 * this.vector4.getTotalCount());
     aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
   }
@@ -75,12 +76,12 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
    * @param recordCount
    * @return
    */
-  public static long memoryNeeded(int recordCount) {
+  public static long memoryNeeded(final int recordCount) {
     // We need 4 bytes (SV4) for each record.
     return recordCount * 4;
   }
 
-  private int merge(int leftStart, int rightStart, int rightEnd, int outStart) {
+  private int merge(final int leftStart, final int rightStart, final int rightEnd, final int outStart) {
     int l = leftStart;
     int r = rightStart;
     int o = outStart;
@@ -107,17 +108,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
   }
 
   @Override
-  public void sort(VectorContainer container) {
-    Stopwatch watch = new Stopwatch();
+  public void sort(final VectorContainer container) {
+    final Stopwatch watch = new Stopwatch();
     watch.start();
     while (runStarts.size() > 1) {
+
+      // check if we're cancelled/failed frequently
+      if (!context.shouldContinue()) {
+        return;
+      }
+
       int outIndex = 0;
       newRunStarts = Queues.newLinkedBlockingQueue();
       newRunStarts.add(outIndex);
-      int size = runStarts.size();
+      final int size = runStarts.size();
       for (int i = 0; i < size / 2; i++) {
-        int left = runStarts.poll();
-        int right = runStarts.poll();
+        final int left = runStarts.poll();
+        final int right = runStarts.poll();
         Integer end = runStarts.peek();
         if (end == null) {
           end = vector4.getTotalCount();
@@ -130,7 +137,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       if (outIndex < vector4.getTotalCount()) {
         copyRun(outIndex, vector4.getTotalCount());
       }
-      SelectionVector4 tmp = aux.createNewWrapperCurrent();
+      final SelectionVector4 tmp = aux.createNewWrapperCurrent();
       aux.clear();
       aux = this.vector4.createNewWrapperCurrent();
       vector4.clear();
@@ -141,23 +148,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
     aux.clear();
   }
 
-  private void copyRun(int start, int end) {
+  private void copyRun(final int start, final int end) {
     for (int i = start; i < end; i++) {
       aux.set(i, vector4.get(i));
     }
   }
 
   @Override
-  public void swap(int sv0, int sv1) {
-    int tmp = vector4.get(sv0);
+  public void swap(final int sv0, final int sv1) {
+    final int tmp = vector4.get(sv0);
     vector4.set(sv0, vector4.get(sv1));
     vector4.set(sv1, tmp);
   }
 
   @Override
-  public int compare(int leftIndex, int rightIndex) {
-    int sv1 = vector4.get(leftIndex);
-    int sv2 = vector4.get(rightIndex);
+  public int compare(final int leftIndex, final int rightIndex) {
+    final int sv1 = vector4.get(leftIndex);
+    final int sv2 = vector4.get(rightIndex);
     compares++;
     return doEval(sv1, sv2);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index 1aadaa2..5e0cd82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.proto.helper;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -29,22 +28,32 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 public class QueryIdHelper {
 
   /* Generate a UUID from the two parts of the queryid */
-  public static String getQueryId(QueryId queryId) {
+  public static String getQueryId(final QueryId queryId) {
     return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
   }
 
-  public static QueryId getQueryIdFromString(String queryId) {
-    UUID uuid = UUID.fromString(queryId);
+  public static QueryId getQueryIdFromString(final String queryId) {
+    final UUID uuid = UUID.fromString(queryId);
     return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
   }
 
-  public static String getQueryIdentifier(FragmentHandle h) {
+  public static String getQueryIdentifier(final FragmentHandle h) {
     return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
   }
 
-  public static String getQueryIdentifiers(QueryId queryId, int majorFragmentId, List<Integer> minorFragmentIds) {
-    String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
+  public static String getExecutorThreadName(final FragmentHandle fragmentHandle) {
+    return String.format("%s:frag:%s:%s",
+        getQueryId(fragmentHandle.getQueryId()),
+        fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+  }
+
+  public static String getQueryIdentifiers(final QueryId queryId, final int majorFragmentId, final List<Integer> minorFragmentIds) {
+    final String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
     return getQueryId(queryId) + ":" + majorFragmentId + ":" + fragmentIds;
   }
 
+  public static String getFragmentId(final FragmentHandle fragmentHandle) {
+    return fragmentHandle.getMajorFragmentId() + ":" + fragmentHandle.getMinorFragmentId();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 2bb29e5..215f580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -41,15 +41,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   protected BatchState state;
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
     this(popConfig, context, true, new OperatorContext(popConfig, context, true));
   }
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
     this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
   }
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema, OperatorContext oContext) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException {
     super();
     this.context = context;
     this.popConfig = popConfig;
@@ -84,16 +84,18 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return popConfig;
   }
 
-  public final IterOutcome next(RecordBatch b) {
-
+  public final IterOutcome next(final RecordBatch b) {
+    if(!context.shouldContinue()) {
+      return IterOutcome.STOP;
+    }
     return next(0, b);
   }
 
-  public final IterOutcome next(int inputIndex, RecordBatch b){
+  public final IterOutcome next(final int inputIndex, final RecordBatch b){
     IterOutcome next = null;
     stats.stopProcessing();
     try{
-      if (context.isCancelled()) {
+      if (!context.shouldContinue()) {
         return IterOutcome.STOP;
       }
       next = b.next();
@@ -141,7 +143,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
         default:
           return innerNext();
       }
-    } catch (SchemaChangeException e) {
+    } catch (final SchemaChangeException e) {
       throw new DrillRuntimeException(e);
     } finally {
       stats.stopProcessing();
@@ -159,7 +161,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     killIncoming(sendUpstream);
   }
 
@@ -182,12 +184,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return container.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return container.getValueAccessorById(clazz, ids);
   }
 
@@ -195,7 +197,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   @Override
   public WritableBatch getWritableBatch() {
 //    logger.debug("Getting writable batch.");
-    WritableBatch batch = WritableBatch.get(this);
+    final WritableBatch batch = WritableBatch.get(this);
     return batch;
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 7d157fe..4f99081 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -19,15 +19,9 @@ package org.apache.drill.exec.record;
 
 import io.netty.buffer.ByteBuf;
 
-import java.util.List;
-
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-
-import com.google.common.collect.Lists;
 
 public class FragmentWritableBatch{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
@@ -37,26 +31,25 @@ public class FragmentWritableBatch{
   private final ByteBuf[] buffers;
   private final FragmentRecordBatch header;
 
-  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
+  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){
     this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
   }
 
-  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds, WritableBatch batch){
+  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){
     this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
   }
 
-  private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentId, RecordBatchDef def, ByteBuf... buffers){
+  private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){
     this.buffers = buffers;
-    FragmentRecordBatch.Builder builder = FragmentRecordBatch //
-        .newBuilder() //
-        .setIsLastBatch(isLast) //
-        .setDef(def) //
+    final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
+        .setIsLastBatch(isLast)
+        .setDef(def)
         .setQueryId(queryId)
-        .setReceivingMajorFragmentId(receiveMajorFragmentId) //
-        .setSendingMajorFragmentId(sendMajorFragmentId) //
+        .setReceivingMajorFragmentId(receiveMajorFragmentId)
+        .setSendingMajorFragmentId(sendMajorFragmentId)
         .setSendingMinorFragmentId(sendMinorFragmentId);
 
-    for(int i : receiveMinorFragmentId){
+    for(final int i : receiveMinorFragmentId){
       builder.addReceivingMinorFragmentId(i);
     }
 
@@ -64,31 +57,32 @@ public class FragmentWritableBatch{
   }
 
 
-  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){
+  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){
     return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
   }
 
-  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds){
+  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){
     return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
   }
 
 
-  public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId,
-                                                             int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+  public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId,
+                                                             final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
     return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
         receiveMinorFragmentId, schema);
   }
 
-  public static FragmentWritableBatch getEmptyBatchWithSchema(boolean isLast, QueryId queryId, int sendMajorFragmentId,
-      int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+  public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId,
+      final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
 
-    List<SerializedField> fields = Lists.newArrayList();
-    for (MaterializedField field : schema) {
-      fields.add(field.getSerializedField());
+    final RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
+    if (schema != null) {
+      for (final MaterializedField field : schema) {
+        def.addField(field.getSerializedField());
+      }
     }
-    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
     return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
-        new int[]{receiveMinorFragmentId}, def);
+        new int[] { receiveMinorFragmentId }, def.build());
   }
 
   public ByteBuf[] getBuffers(){
@@ -97,7 +91,7 @@ public class FragmentWritableBatch{
 
   public long getByteCount() {
     long n = 0;
-    for (ByteBuf buf : buffers) {
+    for (final ByteBuf buf : buffers) {
       n += buf.readableBytes();
     }
     return n;

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index 33e0665..e18b94c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -19,14 +19,14 @@ package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
-import java.io.IOException;
-
 public class DataResponseHandlerImpl implements DataResponseHandler{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
   private final WorkerBee bee;
@@ -45,7 +45,7 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
       final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
 //      logger.debug("Fragment Batch received {}", fragmentBatch);
 
-    boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
+    final boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
     if (canRun) {
 //    logger.debug("Arriving batch means local batch can run, starting local batch.");
       /*
@@ -54,10 +54,5 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
        */
       bee.startFragmentPendingRemote(manager);
     }
-    if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
-//    logger.debug("Removing handler.  Is Last Batch {}.  Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
-//        manager.isWaiting());
-      bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c15bb7c..e7a9a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -76,13 +76,13 @@ public class Drillbit implements AutoCloseable {
     Drillbit bit;
     try {
       bit = new Drillbit(config, remoteServiceSet);
-    } catch (Exception ex) {
+    } catch (final Exception ex) {
       throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
     }
 
     try {
       bit.run();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       bit.close();
       throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
     }
@@ -131,7 +131,7 @@ public class Drillbit implements AutoCloseable {
 
     // parse out the properties, validate, and then set them
     final String systemProps[] = allSystemProps.split(",");
-    for(String systemProp : systemProps) {
+    for(final String systemProp : systemProps) {
       final String keyValue[] = systemProp.split("=");
       if (keyValue.length != 2) {
         throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
@@ -162,7 +162,7 @@ public class Drillbit implements AutoCloseable {
   }
 
   public static void main(final String[] cli) throws DrillbitStartupException {
-    StartupOptions options = StartupOptions.parse(cli);
+    final StartupOptions options = StartupOptions.parse(cli);
     start(options);
   }
 
@@ -174,7 +174,7 @@ public class Drillbit implements AutoCloseable {
   private final Server embeddedJetty;
   private RegistrationHandle registrationHandle;
 
-  public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+  public Drillbit(final DrillConfig config, final RemoteServiceSet serviceSet) throws Exception {
     final long startTime = System.currentTimeMillis();
     logger.debug("Construction started.");
     final boolean allowPortHunting = serviceSet != null;
@@ -269,14 +269,14 @@ public class Drillbit implements AutoCloseable {
 
     try {
       Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while sleeping during coordination deregistration.");
     }
 
     if (embeddedJetty != null) {
       try {
         embeddedJetty.stop();
-      } catch (Exception e) {
+      } catch (final Exception e) {
         logger.warn("Failure while shutting down embedded jetty server.");
       }
     }
@@ -323,7 +323,7 @@ public class Drillbit implements AutoCloseable {
       logger.info("Received shutdown request.");
       try {
         drillbit.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         throw new RuntimeException("Caught exception closing Drillbit started from\n" + stackTrace, e);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
index e9024ff..20f76a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -26,67 +26,79 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 
 interface Comparators {
   final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
-    public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
+    public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) {
       return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getStartTime(), o2.getStartTime());
     }
   };
 
+  final static Comparator<MinorFragmentProfile> lastUpdateCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+      return Long.compare(o1.getLastUpdate(), o2.getLastUpdate());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> lastProgressCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+      return Long.compare(o1.getLastProgress(), o2.getLastProgress());
+    }
+  };
+
   final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime(), o2.getEndTime());
     }
   };
 
   final static Comparator<MinorFragmentProfile> fragPeakMemAllocated = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
     }
   };
 
   final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
     }
   };
 
   final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
-    public int compare(OperatorProfile o1, OperatorProfile o2) {
+    public int compare(final OperatorProfile o1, final OperatorProfile o2) {
       return Long.compare(o1.getOperatorId(), o2.getOperatorId());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> opPeakMem = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
     }
   };

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
index 3a66327..b245b30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
@@ -32,7 +32,7 @@ public class FragmentWrapper {
   private final MajorFragmentProfile major;
   private final long start;
 
-  public FragmentWrapper(MajorFragmentProfile major, long start) {
+  public FragmentWrapper(final MajorFragmentProfile major, final long start) {
     this.major = Preconditions.checkNotNull(major);
     this.start = start;
   }
@@ -45,44 +45,51 @@ public class FragmentWrapper {
     return String.format("fragment-%s", major.getMajorFragmentId());
   }
 
-  public void addSummary(TableBuilder tb) {
+  public void addSummary(final TableBuilder tb, final int colCount) {
     final String fmt = " (%d)";
-    long t0 = start;
+    final long t0 = start;
 
-    ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
+    final ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
         Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
 
     tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
     tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
 
     if (complete.size() < 1) {
-      tb.appendRepeated("", null, 7);
+      tb.appendRepeated("", null, colCount - 2);
       return;
     }
 
-    int li = complete.size() - 1;
+    final MinorFragmentProfile firstStart = Collections.min(complete, Comparators.startTimeCompare);
+    final MinorFragmentProfile lastStart = Collections.max(complete, Comparators.startTimeCompare);
+    tb.appendMillis(firstStart.getStartTime() - t0, String.format(fmt, firstStart.getMinorFragmentId()));
+    tb.appendMillis(lastStart.getStartTime() - t0, String.format(fmt, lastStart.getMinorFragmentId()));
 
-    Collections.sort(complete, Comparators.startTimeCompare);
-    tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-    tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
-    Collections.sort(complete, Comparators.endTimeCompare);
-    tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-    tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+    final MinorFragmentProfile firstEnd = Collections.min(complete, Comparators.endTimeCompare);
+    final MinorFragmentProfile lastEnd = Collections.max(complete, Comparators.endTimeCompare);
+    tb.appendMillis(firstEnd.getEndTime() - t0, String.format(fmt, firstEnd.getMinorFragmentId()));
+    tb.appendMillis(lastEnd.getEndTime() - t0, String.format(fmt, lastEnd.getMinorFragmentId()));
 
     long total = 0;
-    for (MinorFragmentProfile p : complete) {
+    for (final MinorFragmentProfile p : complete) {
       total += p.getEndTime() - p.getStartTime();
     }
-    Collections.sort(complete, Comparators.runTimeCompare);
-    tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
-        String.format(fmt, complete.get(0).getMinorFragmentId()));
+
+    final MinorFragmentProfile shortRun = Collections.min(complete, Comparators.endTimeCompare);
+    final MinorFragmentProfile longRun = Collections.max(complete, Comparators.endTimeCompare);
+
+    tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime(), String.format(fmt, shortRun.getMinorFragmentId()));
     tb.appendMillis((long) (total / complete.size()), null);
-    tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
-        String.format(fmt, complete.get(li).getMinorFragmentId()));
+    tb.appendMillis(longRun.getEndTime() - longRun.getStartTime(), String.format(fmt, longRun.getMinorFragmentId()));
 
-    Collections.sort(complete, Comparators.fragPeakMemAllocated);
-    tb.appendBytes(complete.get(li).getMaxMemoryUsed(), null);
+    final MinorFragmentProfile lastUpdate = Collections.max(complete, Comparators.lastUpdateCompare);
+    tb.appendTime(lastUpdate.getLastUpdate(), null);
+
+    final MinorFragmentProfile lastProgress = Collections.max(complete, Comparators.lastProgressCompare);
+    tb.appendTime(lastProgress.getLastProgress(), null);
+
+    final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragPeakMemAllocated);
+    tb.appendBytes(maxMem.getMaxMemoryUsed(), null);
   }
 
   public String getContent() {
@@ -90,9 +97,10 @@ public class FragmentWrapper {
   }
 
 
-  public String majorFragmentTimingProfile(MajorFragmentProfile major) {
-    final String[] columns = {"Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches", "Peak Memory", "State"};
-    TableBuilder builder = new TableBuilder(columns);
+  public String majorFragmentTimingProfile(final MajorFragmentProfile major) {
+    final String[] columns = { "Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches",
+        "Last Update", "Last Progress", "Peak Memory", "State" };
+    final TableBuilder builder = new TableBuilder(columns);
 
     ArrayList<MinorFragmentProfile> complete, incomplete;
     complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
@@ -101,17 +109,17 @@ public class FragmentWrapper {
         major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
 
     Collections.sort(complete, Comparators.minorIdCompare);
-    for (MinorFragmentProfile minor : complete) {
-      ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
+    for (final MinorFragmentProfile minor : complete) {
+      final ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
 
-      long t0 = start;
+      final long t0 = start;
       long biggestIncomingRecords = 0;
       long biggestBatches = 0;
 
-      for (OperatorProfile op : ops) {
+      for (final OperatorProfile op : ops) {
         long incomingRecords = 0;
         long batches = 0;
-        for (StreamProfile sp : op.getInputProfileList()) {
+        for (final StreamProfile sp : op.getInputProfileList()) {
           incomingRecords += sp.getRecords();
           batches += sp.getBatches();
         }
@@ -127,14 +135,17 @@ public class FragmentWrapper {
 
       builder.appendFormattedInteger(biggestIncomingRecords, null);
       builder.appendFormattedInteger(biggestBatches, null);
+
+      builder.appendTime(minor.getLastUpdate(), null);
+      builder.appendTime(minor.getLastProgress(), null);
       builder.appendBytes(minor.getMaxMemoryUsed(), null);
       builder.appendCell(minor.getState().name(), null);
     }
-    for (MinorFragmentProfile m : incomplete) {
+    for (final MinorFragmentProfile m : incomplete) {
       builder.appendCell(
           major.getMajorFragmentId() + "-"
               + m.getMinorFragmentId(), null);
-      builder.appendRepeated(m.getState().toString(), null, 6);
+      builder.appendRepeated(m.getState().toString(), null, columns.length - 1);
     }
     return builder.toString();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
index 4f4fcdb..7a1d9b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -81,7 +81,6 @@ public class OperatorWrapper {
     CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
     tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
 
-    int li = ops.size() - 1;
     String fmt = " (%s)";
 
     double setupSum = 0.0;
@@ -95,23 +94,26 @@ public class OperatorWrapper {
       memSum += ip.getLeft().getPeakLocalMemoryAllocated();
     }
 
-    Collections.sort(ops, Comparators.setupTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortSetup = Collections.min(ops, Comparators.setupTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longSetup = Collections.max(ops, Comparators.setupTimeSort);
+    tb.appendNanos(shortSetup.getLeft().getSetupNanos(), String.format(fmt, shortSetup.getRight()));
     tb.appendNanos((long) (setupSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longSetup.getLeft().getSetupNanos(), String.format(fmt, longSetup.getRight()));
 
-    Collections.sort(ops, Comparators.processTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortProcess = Collections.min(ops, Comparators.processTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longProcess = Collections.max(ops, Comparators.processTimeSort);
+    tb.appendNanos(shortProcess.getLeft().getProcessNanos(), String.format(fmt, shortProcess.getRight()));
     tb.appendNanos((long) (processSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longProcess.getLeft().getProcessNanos(), String.format(fmt, longProcess.getRight()));
 
-    Collections.sort(ops, Comparators.waitTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortWait = Collections.min(ops, Comparators.waitTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longWait = Collections.max(ops, Comparators.waitTimeSort);
+    tb.appendNanos(shortWait.getLeft().getWaitNanos(), String.format(fmt, shortWait.getRight()));
     tb.appendNanos((long) (waitSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longWait.getLeft().getWaitNanos(), String.format(fmt, longWait.getRight()));
 
-    Collections.sort(ops, Comparators.opPeakMem);
+    final ImmutablePair<OperatorProfile, Integer> peakMem = Collections.max(ops, Comparators.opPeakMem);
     tb.appendBytes((long) (memSum / ops.size()), null);
-    tb.appendBytes(ops.get(li).getLeft().getPeakLocalMemoryAllocated(), null);
+    tb.appendBytes(peakMem.getLeft().getPeakLocalMemoryAllocated(), null);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 80016aa..479e655 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -38,7 +38,7 @@ public class ProfileWrapper {
   public QueryProfile profile;
   public String id;
 
-  public ProfileWrapper(QueryProfile profile) {
+  public ProfileWrapper(final QueryProfile profile) {
     this.profile = profile;
     this.id = QueryIdHelper.getQueryId(profile.getId());
   }
@@ -56,25 +56,25 @@ public class ProfileWrapper {
   }
 
   public List<OperatorWrapper> getOperatorProfiles() {
-    List<OperatorWrapper> ows = Lists.newArrayList();
-    Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
+    final List<OperatorWrapper> ows = Lists.newArrayList();
+    final Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
 
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
     Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
+    for (final MajorFragmentProfile major : majors) {
 
-      List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
+      final List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
       Collections.sort(minors, Comparators.minorIdCompare);
-      for (MinorFragmentProfile minor : minors) {
+      for (final MinorFragmentProfile minor : minors) {
 
-        List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
+        final List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
         Collections.sort(ops, Comparators.operatorIdCompare);
-        for (OperatorProfile op : ops) {
+        for (final OperatorProfile op : ops) {
 
-          ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
+          final ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
               major.getMajorFragmentId(), op.getOperatorId());
           if (!opmap.containsKey(ip)) {
-            List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
+            final List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
             opmap.put(ip, l);
           }
           opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
@@ -82,10 +82,10 @@ public class ProfileWrapper {
       }
     }
 
-    List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
+    final List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
     Collections.sort(keys);
 
-    for (ImmutablePair<Integer, Integer> ip : keys) {
+    for (final ImmutablePair<Integer, Integer> ip : keys) {
       ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
     }
 
@@ -93,11 +93,11 @@ public class ProfileWrapper {
   }
 
   public List<FragmentWrapper> getFragmentProfiles() {
-    List<FragmentWrapper> fws = Lists.newArrayList();
+    final List<FragmentWrapper> fws = Lists.newArrayList();
 
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
     Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
+    for (final MajorFragmentProfile major : majors) {
       fws.add(new FragmentWrapper(major, profile.getStart()));
     }
 
@@ -105,10 +105,11 @@ public class ProfileWrapper {
   }
 
   public String getFragmentsOverview() {
-    final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax", "memmax"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (FragmentWrapper fw : getFragmentProfiles()) {
-      fw.addSummary(tb);
+    final String[] columns = { "Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End",
+        "Last End", "tmin", "tavg", "tmax", "last update", "last progress", "memmax" };
+    final TableBuilder tb = new TableBuilder(columns);
+    for (final FragmentWrapper fw : getFragmentProfiles()) {
+      fw.addSummary(tb, columns.length);
     }
     return tb.toString();
   }
@@ -117,17 +118,17 @@ public class ProfileWrapper {
 
   public String getOperatorsOverview() {
     final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)", "Mem (avg)", "Mem (max)"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (OperatorWrapper ow : getOperatorProfiles()) {
+    final TableBuilder tb = new TableBuilder(columns);
+    for (final OperatorWrapper ow : getOperatorProfiles()) {
       ow.addSummary(tb);
     }
     return tb.toString();
   }
 
   public String getOperatorsJSON() {
-    StringBuilder sb = new StringBuilder("{");
+    final StringBuilder sb = new StringBuilder("{");
     String sep = "";
-    for (CoreOperatorType op : CoreOperatorType.values()) {
+    for (final CoreOperatorType op : CoreOperatorType.values()) {
       sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
       sep = ", ";
     }


[4/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
index 72f4436..e91404f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
@@ -25,23 +25,25 @@ import java.util.Date;
 import java.util.Locale;
 
 class TableBuilder {
-  NumberFormat format = NumberFormat.getInstance(Locale.US);
-  SimpleDateFormat hours = new SimpleDateFormat("HH:mm");
-  SimpleDateFormat shours = new SimpleDateFormat("H:mm");
-  SimpleDateFormat mins = new SimpleDateFormat("mm:ss");
-  SimpleDateFormat smins = new SimpleDateFormat("m:ss");
-
-  SimpleDateFormat secs = new SimpleDateFormat("ss.SSS");
-  SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS");
-  DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
-  DecimalFormat dec = new DecimalFormat("0.00");
-  DecimalFormat intformat = new DecimalFormat("#,###");
-
-  StringBuilder sb;
-  int w = 0;
-  int width;
-
-  public TableBuilder(String[] columns) {
+  private final NumberFormat format = NumberFormat.getInstance(Locale.US);
+  private final SimpleDateFormat days = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+  private final SimpleDateFormat sdays = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+  private final SimpleDateFormat hours = new SimpleDateFormat("HH'h'mm'm'");
+  private final SimpleDateFormat shours = new SimpleDateFormat("H'h'mm'm'");
+  private final SimpleDateFormat mins = new SimpleDateFormat("mm'm'ss's'");
+  private final SimpleDateFormat smins = new SimpleDateFormat("m'm'ss's'");
+
+  private final SimpleDateFormat secs = new SimpleDateFormat("ss.SSS's'");
+  private final SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS's'");
+  private final DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
+  private final DecimalFormat dec = new DecimalFormat("0.00");
+  private final DecimalFormat intformat = new DecimalFormat("#,###");
+
+  private StringBuilder sb;
+  private int w = 0;
+  private int width;
+
+  public TableBuilder(final String[] columns) {
     sb = new StringBuilder();
     width = columns.length;
 
@@ -49,13 +51,13 @@ class TableBuilder {
     format.setMinimumFractionDigits(3);
 
     sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
-    for (String cn : columns) {
+    for (final String cn : columns) {
       sb.append("<th>" + cn + "</th>");
     }
     sb.append("</tr>\n");
   }
 
-  public void appendCell(String s, String link) {
+  public void appendCell(final String s, final String link) {
     if (w == 0) {
       sb.append("<tr>");
     }
@@ -66,22 +68,27 @@ class TableBuilder {
     }
   }
 
-  public void appendRepeated(String s, String link, int n) {
+  public void appendRepeated(final String s, final String link, final int n) {
     for (int i = 0; i < n; i++) {
       appendCell(s, link);
     }
   }
 
-  public void appendTime(long d, String link) {
+  public void appendTime(final long d, final String link) {
     appendCell(dateFormat.format(d), link);
   }
 
-  public void appendMillis(long p, String link) {
-    double secs = p/1000.0;
-    double mins = secs/60;
-    double hours = mins/60;
+  public void appendMillis(final long p, final String link) {
+    final double secs = p/1000.0;
+    final double mins = secs/60;
+    final double hours = mins/60;
+    final double days = hours / 24;
     SimpleDateFormat timeFormat = null;
-    if(hours >= 10){
+    if (days >= 10) {
+      timeFormat = this.days;
+    } else if (days >= 1) {
+      timeFormat = this.sdays;
+    } else if (hours >= 10) {
       timeFormat = this.hours;
     }else if(hours >= 1){
       timeFormat = this.shours;
@@ -97,30 +104,30 @@ class TableBuilder {
     appendCell(timeFormat.format(new Date(p)), null);
   }
 
-  public void appendNanos(long p, String link) {
+  public void appendNanos(final long p, final String link) {
     appendMillis((long) (p / 1000.0 / 1000.0), link);
   }
 
-  public void appendFormattedNumber(Number n, String link) {
+  public void appendFormattedNumber(final Number n, final String link) {
     appendCell(format.format(n), link);
   }
 
-  public void appendFormattedInteger(long n, String link) {
+  public void appendFormattedInteger(final long n, final String link) {
     appendCell(intformat.format(n), link);
   }
 
-  public void appendInteger(long l, String link) {
+  public void appendInteger(final long l, final String link) {
     appendCell(Long.toString(l), link);
   }
 
-  public void appendBytes(long l, String link){
+  public void appendBytes(final long l, final String link){
     appendCell(bytePrint(l), link);
   }
 
-  private String bytePrint(long size){
-    double m = size/Math.pow(1024, 2);
-    double g = size/Math.pow(1024, 3);
-    double t = size/Math.pow(1024, 4);
+  private String bytePrint(final long size){
+    final double m = size/Math.pow(1024, 2);
+    final double g = size/Math.pow(1024, 3);
+    final double t = size/Math.pow(1024, 4);
     if (t > 1) {
       return dec.format(t).concat("TB");
     } else if (g > 1) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e2bcec3..a3ceb8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -23,7 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.SelfCleaningRunnable;
@@ -47,6 +48,7 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryManager;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
@@ -101,7 +103,17 @@ public class WorkManager implements AutoCloseable {
      * threads that can be created. Ideally, this might be computed based on the number of cores or
      * some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice.
      */
-    executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
+    executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new NamedThreadFactory("WorkManager-")) {
+            @Override
+            protected void afterExecute(final Runnable r, final Throwable t) {
+              if(t != null){
+                logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
+              }
+              super.afterExecute(r, t);
+            }
+      };
+
 
     // TODO references to this escape here (via WorkerBee) before construction is done
     controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
@@ -125,7 +137,7 @@ public class WorkManager implements AutoCloseable {
                   return runningFragments.size();
                 }
           });
-    } catch (IllegalArgumentException e) {
+    } catch (final IllegalArgumentException e) {
       logger.warn("Exception while registering metrics", e);
     }
   }
@@ -160,7 +172,7 @@ public class WorkManager implements AutoCloseable {
       if (executor != null) {
         executor.awaitTermination(1, TimeUnit.SECONDS);
       }
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Executor interrupted while awaiting termination");
     }
   }
@@ -188,7 +200,7 @@ public class WorkManager implements AutoCloseable {
     while(true) {
       try {
         exitLatch.await(5, TimeUnit.SECONDS);
-      } catch(InterruptedException e) {
+      } catch(final InterruptedException e) {
         // keep waiting
       }
       break;
@@ -263,6 +275,7 @@ public class WorkManager implements AutoCloseable {
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
+          workBus.removeFragmentManager(fragmentHandle);
           indicateIfSafeToExit();
         }
       });
@@ -289,28 +302,29 @@ public class WorkManager implements AutoCloseable {
     @Override
     public void run() {
       while(true) {
+        final Controller controller = dContext.getController();
         final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
-        for(FragmentExecutor fragmentExecutor : runningFragments.values()) {
+        for(final FragmentExecutor fragmentExecutor : runningFragments.values()) {
           final FragmentStatus status = fragmentExecutor.getStatus();
           if (status == null) {
             continue;
           }
 
           final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint();
-          futures.add(dContext.getController().getTunnel(ep).sendFragmentStatus(status));
+          futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
         }
 
-        for(DrillRpcFuture<Ack> future : futures) {
+        for(final DrillRpcFuture<Ack> future : futures) {
           try {
             future.checkedGet();
-          } catch(RpcException ex) {
+          } catch(final RpcException ex) {
             logger.info("Failure while sending intermediate fragment status to Foreman", ex);
           }
         }
 
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
-        } catch(InterruptedException e) {
+        } catch(final InterruptedException e) {
           // exit status thread on interrupt.
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3a7123d..3fb0775 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcConstants;
@@ -137,10 +138,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
         drillbitContext.getWorkBus().addFragmentManager(manager);
       }
 
-    } catch (Exception e) {
+    } catch (final Exception e) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Failure while trying to start remote fragment", e);
-    } catch (OutOfMemoryError t) {
+    } catch (final OutOfMemoryError t) {
       if (t.getMessage().startsWith("Direct buffer")) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Out of direct memory while trying to start remote fragment", t);
@@ -171,19 +172,24 @@ public class ControlHandlerImpl implements ControlMessageHandler {
   }
 
   private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
     final FragmentManager manager =
         bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
 
     FragmentExecutor executor;
     if (manager != null) {
-      executor = manager.getRunnable();
+      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
     } else {
       // then try local cancel.
       executor = bee.getFragmentRunner(finishedReceiver.getSender());
-    }
-
-    if (executor != null) {
-      executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      if (executor != null) {
+        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      } else {
+        logger.warn(
+            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+      }
     }
 
     return Acks.OK;

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 2430e64..85262de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -47,10 +47,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
   private int streamCounter;
-  private int fragmentCount;
-  private FragmentContext context;
+  private final int fragmentCount;
+  private final FragmentContext context;
 
-  public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
+  public UnlimitedRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
     bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
 
     this.softlimit = bufferSizePerSocket * fragmentCount;
@@ -63,7 +63,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public void enqueue(RawFragmentBatch batch) throws IOException {
+  public void enqueue(final RawFragmentBatch batch) throws IOException {
+
+    // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
+    // ensure that tests run.
+    if (context != null && !context.shouldContinue()) {
+      this.kill(context);
+    }
+
     if (isFinished()) {
       if (state == BufferState.KILLED) {
         // do not even enqueue just release and send ack back
@@ -76,8 +83,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     }
     if (batch.getHeader().getIsOutOfMemory()) {
       logger.trace("Setting autoread false");
-      RawFragmentBatch firstBatch = buffer.peekFirst();
-      FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
+      final RawFragmentBatch firstBatch = buffer.peekFirst();
+      final FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
       if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
         buffer.addFirst(batch);
       }
@@ -96,16 +103,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void cleanup() {
-    if (!isFinished() && !context.isCancelled()) {
-      String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
+    if (!isFinished() && context.shouldContinue()) {
+      final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
       logger.error(msg);
-      IllegalStateException e = new IllegalStateException(msg);
+      final IllegalStateException e = new IllegalStateException(msg);
       context.fail(e);
       throw e;
     }
 
     if (!buffer.isEmpty()) {
-      if (!context.isFailed() && !context.isCancelled()) {
+      if (context.shouldContinue()) {
         context.fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", buffer.size());
       }
@@ -114,7 +121,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public void kill(FragmentContext context) {
+  public void kill(final FragmentContext context) {
     state = BufferState.KILLED;
     clearBufferWithBody();
   }
@@ -125,7 +132,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
    */
   private void clearBufferWithBody() {
     while (!buffer.isEmpty()) {
-      RawFragmentBatch batch = buffer.poll();
+      final RawFragmentBatch batch = buffer.poll();
       if (batch.getBody() != null) {
         batch.getBody().release();
       }
@@ -160,7 +167,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     if (b == null && (!isFinished() || !buffer.isEmpty())) {
       try {
         b = buffer.take();
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         return null;
         // TODO InterruptedException
       }
@@ -175,9 +182,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
     // when queue size is lower then softlimit - the bigger the difference the more we can flush
     if (!isFinished() && overlimit.get()) {
-      int flushCount = softlimit - buffer.size();
+      final int flushCount = softlimit - buffer.size();
       if ( flushCount > 0 ) {
-        int flushed = readController.flushResponses(flushCount);
+        final int flushed = readController.flushResponses(flushCount);
         logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
         if ( flushed == 0 ) {
           // queue is empty - nothing to do for now

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index f824b53..d94b9f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -23,15 +23,15 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.drill.common.EventProcessor;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
@@ -77,8 +77,10 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -99,6 +101,7 @@ import com.google.common.collect.Multimap;
 public class Foreman implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
   private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+  private static final int RPC_WAIT_IN_SECONDS = 90;
 
   private final QueryId queryId;
   private final RunQuery queryRequest;
@@ -113,7 +116,7 @@ public class Foreman implements Runnable {
 
   private FragmentExecutor rootRunner; // root Fragment
 
-  private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
   private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
@@ -204,12 +207,12 @@ public class Foreman implements Runnable {
         throw new IllegalStateException();
       }
       injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
-    } catch (ForemanException e) {
+    } catch (final ForemanException e) {
       moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {
       moveToState(QueryState.FAILED,
           new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
-    } catch (OutOfMemoryError e) {
+    } catch (final OutOfMemoryError e) {
       /*
        * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
        * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
@@ -255,9 +258,9 @@ public class Foreman implements Runnable {
       try {
         lease.close();
         lease = null;
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         // if we end up here, the while loop will try again
-      } catch (Exception e) {
+      } catch (final Exception e) {
         logger.warn("Failure while releasing lease.", e);
         break;
       }
@@ -268,7 +271,7 @@ public class Foreman implements Runnable {
     LogicalPlan logicalPlan;
     try {
       logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ForemanException("Failure parsing logical plan.", e);
     }
 
@@ -299,9 +302,9 @@ public class Foreman implements Runnable {
   private void log(final PhysicalPlan plan) {
     if (logger.isDebugEnabled()) {
       try {
-        String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
+        final String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
         logger.debug("Physical {}", planText);
-      } catch (IOException e) {
+      } catch (final IOException e) {
         logger.warn("Error while attempting to log physical plan.", e);
       }
     }
@@ -324,7 +327,7 @@ public class Foreman implements Runnable {
     try {
       final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
       runPhysicalPlan(plan);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ForemanSetupException("Failure while parsing physical plan.", e);
     }
   }
@@ -339,8 +342,8 @@ public class Foreman implements Runnable {
     final PlanFragment rootPlanFragment = work.getRootFragment();
     assert queryId == rootPlanFragment.getHandle().getQueryId();
 
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
     logger.debug("Submitting fragments to run.");
 
@@ -365,7 +368,7 @@ public class Foreman implements Runnable {
   private void setupSortMemoryAllocations(final PhysicalPlan plan) {
     // look for external sorts
     final List<ExternalSort> sortList = new LinkedList<>();
-    for (PhysicalOperator op : plan.getSortedOperators()) {
+    for (final PhysicalOperator op : plan.getSortedOperators()) {
       if (op instanceof ExternalSort) {
         sortList.add((ExternalSort) op);
       }
@@ -382,7 +385,7 @@ public class Foreman implements Runnable {
       final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
       logger.debug("Max sort alloc: {}", maxSortAlloc);
 
-      for(ExternalSort externalSort : sortList) {
+      for(final ExternalSort externalSort : sortList) {
         externalSort.setMaxAllocation(maxSortAlloc);
       }
     }
@@ -403,7 +406,7 @@ public class Foreman implements Runnable {
     if (queuingEnabled) {
       final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
       double totalCost = 0;
-      for (PhysicalOperator ops : plan.getSortedOperators()) {
+      for (final PhysicalOperator ops : plan.getSortedOperators()) {
         totalCost += ops.getCost();
       }
 
@@ -423,7 +426,7 @@ public class Foreman implements Runnable {
 
         final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
         lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         throw new ForemanSetupException("Unable to acquire slot for query.", e);
       }
     }
@@ -447,7 +450,7 @@ public class Foreman implements Runnable {
       final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
       final int fragmentCount = planFragments.size();
       int fragmentIndex = 0;
-      for(PlanFragment planFragment : planFragments) {
+      for(final PlanFragment planFragment : planFragments) {
         final FragmentHandle fragmentHandle = planFragment.getHandle();
         sb.append("PlanFragment(");
         sb.append(++fragmentIndex);
@@ -471,7 +474,7 @@ public class Foreman implements Runnable {
         {
           final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
           jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
-        } catch(Exception e) {
+        } catch(final Exception e) {
           // we've already set jsonString to a fallback value
         }
         sb.append(jsonString);
@@ -567,7 +570,7 @@ public class Foreman implements Runnable {
 
       try {
         autoCloseable.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         /*
          * Even if the query completed successfully, we'll still report failure if we have
          * problems cleaning up.
@@ -582,11 +585,11 @@ public class Foreman implements Runnable {
       Preconditions.checkState(!isClosed);
       Preconditions.checkState(resultState != null);
 
-      logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+      logger.info("foreman cleaning up.");
 
       // These are straight forward removals from maps, so they won't throw.
       drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
-      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
       suppressingClose(queryContext);
 
@@ -612,8 +615,8 @@ public class Foreman implements Runnable {
           .setQueryId(queryId)
           .setQueryState(resultState);
       if (resultException != null) {
-        boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-        UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
+        final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+        final UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
         resultBuilder.addError(uex.getOrCreatePBError(verbose));
       }
 
@@ -626,7 +629,7 @@ public class Foreman implements Runnable {
       try {
         // send whatever result we ended up with
         initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
-      } catch(Exception e) {
+      } catch(final Exception e) {
         addException(e);
         logger.warn("Exception sending result to client", resultException);
       }
@@ -658,7 +661,7 @@ public class Foreman implements Runnable {
     }
 
     @Override
-    protected void processEvent(StateEvent event) {
+    protected void processEvent(final StateEvent event) {
       final QueryState newState = event.newState;
       final Exception exception = event.exception;
 
@@ -803,7 +806,7 @@ public class Foreman implements Runnable {
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
     rootRunner = new FragmentExecutor(rootContext, rootOperator,
-        queryManager.getRootStatusHandler(rootContext));
+        queryManager.newRootStatusHandler(rootContext));
     final RootFragmentManager fragmentManager =
         new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
@@ -836,7 +839,7 @@ public class Foreman implements Runnable {
     final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
 
     // record all fragments for status purposes.
-    for (PlanFragment planFragment : fragments) {
+    for (final PlanFragment planFragment : fragments) {
       logger.trace("Tracking intermediate remote node {} with data {}",
                    planFragment.getAssignment(), planFragment.getFragmentJson());
       queryManager.addFragmentStatusTracker(planFragment, false);
@@ -855,40 +858,53 @@ public class Foreman implements Runnable {
      * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
      * know if any submissions did fail.
      */
-    final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+    final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
     final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
+    for (final DrillbitEndpoint ep : intFragmentMap.keySet()) {
       sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
-    // wait for the status of all requests sent above to be known
-    boolean ready = false;
-    while(!ready) {
-      try {
-        endpointLatch.await();
-        ready = true;
-      } catch (InterruptedException e) {
-        // if we weren't ready, the while loop will continue to wait
-      }
+    if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+      long numberRemaining = endpointLatch.getCount();
+      throw UserException.connectionError()
+          .message(
+              "Exceeded timeout while waiting send intermediate work fragments to remote nodes.  Sent %d and only heard response back from %d nodes.",
+              intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+          .build();
     }
 
+
     // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
-        fragmentSubmitFailures.submissionExceptions;
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
     if (submissionExceptions.size() > 0) {
-      throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
-          submissionExceptions.get(0).rpcException);
-      // TODO indicate the failing drillbit?
-      // TODO report on all the failures?
+      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+
+      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
+        DrillbitEndpoint endpoint = e.drillbitEndpoint;
+        if (endpoints.add(endpoint)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(endpoint.getAddress());
+        }
+      }
+      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
+          .message("Error setting up remote intermediate fragment execution")
+          .addContext("Nodes with failures", sb.toString())
+          .build();
     }
 
     /*
      * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
      * the regular sendListener event delivery.
      */
-    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
       sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
     }
   }
@@ -906,7 +922,7 @@ public class Foreman implements Runnable {
     @SuppressWarnings("resource")
     final Controller controller = drillbitContext.getController();
     final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(PlanFragment planFragment : fragments) {
+    for(final PlanFragment planFragment : fragments) {
       fb.addFragment(planFragment);
     }
     final InitializeFragments initFrags = fb.build();
@@ -926,12 +942,12 @@ public class Foreman implements Runnable {
    */
   private static class FragmentSubmitFailures {
     static class SubmissionException {
-//      final DrillbitEndpoint drillbitEndpoint;
+      final DrillbitEndpoint drillbitEndpoint;
       final RpcException rpcException;
 
       SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
           final RpcException rpcException) {
-//        this.drillbitEndpoint = drillbitEndpoint;
+        this.drillbitEndpoint = drillbitEndpoint;
         this.rpcException = rpcException;
       }
     }
@@ -999,16 +1015,7 @@ public class Foreman implements Runnable {
      *   to the user
      */
     public void moveToState(final QueryState newState, final Exception ex) {
-      boolean ready = false;
-      while(!ready) {
-        try {
-          acceptExternalEvents.await();
-          ready = true;
-        } catch(InterruptedException e) {
-          // if we're still not ready, the while loop will cause us to wait again
-          logger.warn("Interrupted while waiting to move state.", e);
-        }
-      }
+      acceptExternalEvents.awaitUninterruptibly();
 
       Foreman.this.moveToState(newState, ex);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 433ab26..ceb77f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -22,11 +22,16 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public class FragmentData {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
+
   private final boolean isLocal;
   private volatile FragmentStatus status;
-  private volatile long lastStatusUpdate = 0;
+  private volatile long lastStatusUpdate = System.currentTimeMillis();
+  private volatile long lastProgress = System.currentTimeMillis();
   private final DrillbitEndpoint endpoint;
 
   public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
@@ -43,13 +48,45 @@ public class FragmentData {
         .build();
   }
 
-  public void setStatus(final FragmentStatus status) {
-    this.status = status;
-    lastStatusUpdate = System.currentTimeMillis();
+  /**
+   * Update the status for this fragment.  Also records last update and last progress time.
+   * @param status Updated status
+   * @return Whether or not the status update resulted in a FragmentState change.
+   */
+  public boolean setStatus(final FragmentStatus newStatus) {
+    final long time = System.currentTimeMillis();
+    final FragmentState oldState = status.getProfile().getState();
+    final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED;
+    final FragmentState currentState = newStatus.getProfile().getState();
+    final boolean stateChanged = currentState != oldState;
+
+    if (inTerminalState) {
+      // already in a terminal state. This shouldn't happen.
+      logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
+          QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState));
+      return false;
+    }
+
+    this.lastStatusUpdate = time;
+    if (madeProgress(status, newStatus)) {
+      this.lastProgress = time;
+    }
+    status = newStatus;
+
+    return stateChanged;
+  }
+
+  public FragmentState getState() {
+    return status.getProfile().getState();
   }
 
-  public FragmentStatus getStatus() {
-    return status;
+  public MinorFragmentProfile getProfile() {
+    return status
+        .getProfile()
+        .toBuilder()
+        .setLastUpdate(lastStatusUpdate)
+        .setLastProgress(lastProgress)
+        .build();
   }
 
   public boolean isLocal() {
@@ -64,6 +101,34 @@ public class FragmentData {
     return status.getHandle();
   }
 
+  private boolean madeProgress(final FragmentStatus prev, final FragmentStatus cur) {
+    final MinorFragmentProfile previous = prev.getProfile();
+    final MinorFragmentProfile current = cur.getProfile();
+
+    if (previous.getState() != current.getState()) {
+      return true;
+    }
+
+    if (previous.getOperatorProfileCount() != current.getOperatorProfileCount()) {
+      return true;
+    }
+
+    for(int i =0; i < current.getOperatorProfileCount(); i++){
+      if (madeProgress(previous.getOperatorProfile(i), current.getOperatorProfile(i))) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean madeProgress(final OperatorProfile prev, final OperatorProfile cur) {
+    return prev.getInputProfileCount() != cur.getInputProfileCount()
+        || !prev.getInputProfileList().equals(cur.getInputProfileList())
+        || prev.getMetricCount() != cur.getMetricCount()
+        || !prev.getMetricList().equals(cur.getMetricList());
+  }
+
   @Override
   public String toString() {
     return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 31b1f2b..34fa639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -21,11 +21,12 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -50,15 +51,17 @@ import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.StatusReporter;
 
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 
 /**
  * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
  */
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+public class QueryManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
 
   public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
@@ -74,7 +77,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       .ephemeral()
       .build();
 
-  private final Set<DrillbitEndpoint> includedBits;
+  private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
   private final StateListener stateListener;
   private final QueryId queryId;
   private final String stringQueryId;
@@ -94,8 +97,13 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
 
   // the following mutable variables are used to capture ongoing query status
   private String planText;
-  private long startTime;
+  private long startTime = System.currentTimeMillis();
   private long endTime;
+
+  // How many nodes have finished their execution.  Query is complete when all nodes are complete.
+  private final AtomicInteger finishedNodes = new AtomicInteger(0);
+
+  // How many fragments have finished their execution.
   private final AtomicInteger finishedFragments = new AtomicInteger(0);
 
   public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
@@ -109,83 +117,27 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     try {
       profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
       profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-
-    includedBits = Sets.newHashSet();
-  }
-
-  @Override
-  public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
-  }
-
-  @Override
-  public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
-    for(DrillbitEndpoint ep : unregisteredDrillbits) {
-      if (includedBits.contains(ep)) {
-        logger.warn("Drillbit {} no longer registered in cluster.  Canceling query {}",
-            ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
-        stateListener.moveToState(QueryState.FAILED,
-            new ForemanException("One more more nodes lost connectivity during query.  Identified node was "
-                + ep.getAddress()));
-      }
-    }
   }
 
-  @Override
-  public void statusUpdate(final FragmentStatus status) {
-    logger.debug("New fragment status was provided to QueryManager of {}", status);
-    switch(status.getProfile().getState()) {
-    case AWAITING_ALLOCATION:
-    case RUNNING:
-      updateFragmentStatus(status);
-      break;
-
-    case FINISHED:
-      fragmentDone(status);
-      break;
-
-    case CANCELLED:
-      /*
-       * TODO
-       * This doesn't seem right; shouldn't this be similar to FAILED?
-       * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
-       *
-       * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
-       * but not if our cancellation listener gets it?
-       */
-      // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
-      fragmentDone(status);
-      break;
-
-    case FAILED:
-      stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
-      break;
-
-    default:
-      throw new UnsupportedOperationException(String.format("Received status of %s", status));
-    }
-  }
-
-  private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+  private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
     final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
     final int majorFragmentId = fragmentHandle.getMajorFragmentId();
     final int minorFragmentId = fragmentHandle.getMinorFragmentId();
-    fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
+    final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
+    return data.setStatus(fragmentStatus);
   }
 
   private void fragmentDone(final FragmentStatus status) {
-    updateFragmentStatus(status);
+    final boolean stateChanged = updateFragmentStatus(status);
 
-    final int finishedFragments = this.finishedFragments.incrementAndGet();
-    final int totalFragments = fragmentDataSet.size();
-    assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
-    final int remaining = totalFragments - finishedFragments;
-    logger.debug("waiting for {} fragments", remaining);
-    if (remaining == 0) {
-      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
-      stateListener.moveToState(QueryState.COMPLETED, null);
+    if (stateChanged) {
+      // since we're in the fragment done clause and this was a change from previous
+      final NodeTracker node = nodeMap.get(status.getProfile().getEndpoint());
+      node.fragmentComplete();
+      finishedFragments.incrementAndGet();
     }
   }
 
@@ -201,9 +153,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     }
     minorMap.put(minorFragmentId, fragmentData);
     fragmentDataSet.add(fragmentData);
-
-    // keep track of all the drill bits that are used by this query
-    includedBits.add(fragmentData.getEndpoint());
   }
 
   public String getFragmentStatesAsString() {
@@ -211,7 +160,16 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   }
 
   void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
-    addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
+    final DrillbitEndpoint assignment = fragment.getAssignment();
+
+    NodeTracker tracker = nodeMap.get(assignment);
+    if (tracker == null) {
+      tracker = new NodeTracker(assignment);
+      nodeMap.put(assignment, tracker);
+    }
+
+    tracker.addFragment();
+    addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
   }
 
   /**
@@ -219,23 +177,23 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
    */
   void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
     final Controller controller = drillbitContext.getController();
-    for(FragmentData data : fragmentDataSet) {
-      final FragmentStatus fragmentStatus = data.getStatus();
-      switch(fragmentStatus.getProfile().getState()) {
+    for(final FragmentData data : fragmentDataSet) {
+      switch(data.getState()) {
       case SENDING:
       case AWAITING_ALLOCATION:
       case RUNNING:
-        if (rootRunner != null) {
+        if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
             rootRunner.cancel();
         } else {
           final DrillbitEndpoint endpoint = data.getEndpoint();
-          final FragmentHandle handle = fragmentStatus.getHandle();
+          final FragmentHandle handle = data.getHandle();
           // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
           controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
         }
         break;
 
       case FINISHED:
+      case CANCELLATION_REQUESTED:
       case CANCELLED:
       case FAILED:
         // nothing to do
@@ -267,21 +225,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     }
   }
 
-  public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
-    return new RootStatusReporter(context);
-  }
-
-  class RootStatusReporter extends AbstractStatusReporter {
-    private RootStatusReporter(final FragmentContext context) {
-      super(context);
-    }
-
-    @Override
-    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
-      statusUpdate(status);
-    }
-  }
-
   QueryState updateQueryStateInStore(final QueryState queryState) {
     switch (queryState) {
       case PENDING:
@@ -295,7 +238,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       case FAILED:
         try {
           profileEStore.delete(stringQueryId);
-        } catch(Exception e) {
+        } catch(final Exception e) {
           logger.warn("Failure while trying to delete the estore profile for this query.", e);
         }
 
@@ -345,7 +288,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
         for (int v = 0; v < minorMap.allocated.length; v++) {
           if (minorMap.allocated[v]) {
             final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
-            fb.addMinorFragmentProfile(data.getStatus().getProfile());
+            fb.addMinorFragmentProfile(data.getProfile());
           }
         }
         profileBuilder.addFragmentProfile(fb);
@@ -366,4 +309,159 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   void markEndTime() {
     endTime = System.currentTimeMillis();
   }
+
+  /**
+   * Internal class used to track the number of pending completion messages required from particular node. This allows
+   * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
+   * there is a node failure, we can then correctly track how many outstanding messages will never arrive.
+   */
+  private class NodeTracker {
+    private final DrillbitEndpoint endpoint;
+    private final AtomicInteger totalFragments = new AtomicInteger(0);
+    private final AtomicInteger completedFragments = new AtomicInteger(0);
+
+    public NodeTracker(final DrillbitEndpoint endpoint) {
+      this.endpoint = endpoint;
+    }
+
+    /**
+     * Increments the number of fragment this node is running.
+     */
+    public void addFragment() {
+      totalFragments.incrementAndGet();
+    }
+
+    /**
+     * Increments the number of fragments completed on this node.  Once the number of fragments completed
+     * equals the number of fragments running, this will be marked as a finished node and result in the finishedNodes being incremented.
+     *
+     * If the number of remaining nodes has been decremented to zero, this will allow the query to move to a completed state.
+     */
+    public void fragmentComplete() {
+      if (totalFragments.get() == completedFragments.incrementAndGet()) {
+        nodeComplete();
+      }
+    }
+
+    /**
+     * Increments the number of fragments completed on this node until we mark this node complete. Note that this uses
+     * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
+     * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
+     * external event).
+     */
+    public void nodeDead() {
+      while (completedFragments.get() < totalFragments.get()) {
+        fragmentComplete();
+      }
+    }
+
+  }
+
+  /**
+   * Increments the number of currently complete nodes and returns the number of completed nodes. If the there are no
+   * more pending nodes, moves the query to a terminal state.
+   */
+  private void nodeComplete() {
+    final int finishedNodes = this.finishedNodes.incrementAndGet();
+    final int totalNodes = nodeMap.size();
+    Preconditions.checkArgument(finishedNodes <= totalNodes, "The finished node count exceeds the total node count");
+    final int remaining = totalNodes - finishedNodes;
+    if (remaining == 0) {
+      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
+      stateListener.moveToState(QueryState.COMPLETED, null);
+    } else {
+      logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
+          this.fragmentDataSet.size() - finishedFragments.get());
+    }
+  }
+
+  public StatusReporter newRootStatusHandler(final FragmentContext context) {
+    return new RootStatusReporter(context);
+  }
+
+  private class RootStatusReporter extends AbstractStatusReporter {
+    private RootStatusReporter(final FragmentContext context) {
+      super(context);
+    }
+
+    @Override
+    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
+      fragmentStatusListener.statusUpdate(status);
+    }
+  }
+
+  public FragmentStatusListener getFragmentStatusListener(){
+    return fragmentStatusListener;
+  }
+
+  private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener() {
+    @Override
+    public void statusUpdate(final FragmentStatus status) {
+      logger.debug("New fragment status was provided to QueryManager of {}", status);
+      switch(status.getProfile().getState()) {
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+      case CANCELLATION_REQUESTED:
+        updateFragmentStatus(status);
+        break;
+
+      case FAILED:
+        stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
+        // fall-through.
+      case FINISHED:
+      case CANCELLED:
+        fragmentDone(status);
+        break;
+
+      default:
+        throw new UnsupportedOperationException(String.format("Received status of %s", status));
+      }
+    }
+  };
+
+
+  public DrillbitStatusListener getDrillbitStatusListener() {
+    return drillbitStatusListener;
+  }
+
+  private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){
+
+    @Override
+    public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
+    }
+
+    @Override
+    public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+      final StringBuilder failedNodeList = new StringBuilder();
+      boolean atLeastOneFailure = false;
+
+      for(final DrillbitEndpoint ep : unregisteredDrillbits) {
+        final NodeTracker tracker = nodeMap.get(ep);
+        if (tracker != null) {
+          // mark node as dead.
+          tracker.nodeDead();
+
+          // capture node name for exception or logging message
+          if (atLeastOneFailure) {
+            failedNodeList.append(", ");
+          }else{
+            atLeastOneFailure = true;
+          }
+          failedNodeList.append(ep.getAddress());
+          failedNodeList.append(":");
+          failedNodeList.append(ep.getUserPort());
+
+        }
+      }
+
+      if (!atLeastOneFailure) {
+        logger.warn("Drillbits [{}] no longer registered in cluster.  Canceling query {}",
+            failedNodeList, QueryIdHelper.getQueryId(queryId));
+        stateListener.moveToState(QueryState.FAILED,
+            new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].",
+                failedNodeList)));
+      }
+
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 4ff28f3..8a40f1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -24,29 +24,29 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public abstract class AbstractStatusReporter implements StatusReporter{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
 
-  private FragmentContext context;
-  private volatile long startNanos;
+  private final FragmentContext context;
 
-  public AbstractStatusReporter(FragmentContext context) {
+  public AbstractStatusReporter(final FragmentContext context) {
     super();
     this.context = context;
   }
 
-  private  FragmentStatus.Builder getBuilder(FragmentState state){
+  private  FragmentStatus.Builder getBuilder(final FragmentState state){
     return getBuilder(context, state, null);
   }
 
-  public static FragmentStatus.Builder getBuilder(FragmentContext context, FragmentState state, UserException ex){
-    FragmentStatus.Builder status = FragmentStatus.newBuilder();
-    MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+  public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){
+    final FragmentStatus.Builder status = FragmentStatus.newBuilder();
+    final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
     context.getStats().addMetricsToStatus(b);
     b.setState(state);
     if(ex != null){
-      boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+      final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
       b.setError(ex.getOrCreatePBError(verbose));
     }
     status.setHandle(context.getHandle());
@@ -57,60 +57,36 @@ public abstract class AbstractStatusReporter implements StatusReporter{
   }
 
   @Override
-  public void stateChanged(FragmentHandle handle, FragmentState newState) {
-    FragmentStatus.Builder status = getBuilder(newState);
-
+  public void stateChanged(final FragmentHandle handle, final FragmentState newState) {
+    final FragmentStatus.Builder status = getBuilder(newState);
+    logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState);
     switch(newState){
     case AWAITING_ALLOCATION:
-      awaitingAllocation(handle, status);
-      break;
+    case CANCELLATION_REQUESTED:
     case CANCELLED:
-      cancelled(handle, status);
-      break;
-    case FAILED:
-      // no op since fail should have also been called.
-      break;
     case FINISHED:
-      finished(handle, status);
-      break;
     case RUNNING:
-      this.startNanos = System.nanoTime();
-      running(handle, status);
+      statusChange(handle, status.build());
       break;
     case SENDING:
       // no op.
       break;
+    case FAILED:
+      // shouldn't get here since fail() should be called.
     default:
-      break;
-
+      throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
     }
   }
 
-  protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void cancelled(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
   protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
 
   @Override
-  public final void fail(FragmentHandle handle, String message, UserException excep) {
-    FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
+  public final void fail(final FragmentHandle handle, final UserException excep) {
+    final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
     fail(handle, status);
   }
 
-  protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+  private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) {
     statusChange(handle, statusBuilder.build());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a4a97c9..3570ba5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.DeferredException;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContext.ExecutorState;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -42,36 +44,42 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
 
-  // TODO:  REVIEW:  Can't this be AtomicReference<FragmentState> (so that
-  // debugging and logging don't show just integer values--and for type safety)?
-  private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+  private final String fragmentName;
   private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
-  private volatile boolean canceled;
-  private volatile boolean closed;
-  private RootExec root;
+  private final DeferredException deferredException = new DeferredException();
 
+  private volatile RootExec root;
+  private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
 
   public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
                           final StatusReporter listener) {
     this.fragmentContext = context;
     this.rootOperator = rootOperator;
     this.listener = listener;
+    this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
+
+    context.setExecutorState(new ExecutorStateImpl());
   }
 
   @Override
   public String toString() {
-    return
-        super.toString()
-        + "[closed = " + closed
-        + ", state = " + state
-        + ", rootOperator = " + rootOperator
-        + ", fragmentContext = " + fragmentContext
-        + ", listener = " + listener
-        + "]";
+    final StringBuilder builder = new StringBuilder();
+    builder.append("FragmentExecutor [fragmentContext=");
+    builder.append(fragmentContext);
+    builder.append(", fragmentState=");
+    builder.append(fragmentState);
+    builder.append("]");
+    return builder.toString();
   }
 
+  /**
+   * Returns the current fragment status if the fragment is running. Otherwise, returns no status.
+   *
+   * @return FragmentStatus or null.
+   */
   public FragmentStatus getStatus() {
     /*
      * If the query is not in a running state, the operator tree is still being constructed and
@@ -81,29 +89,46 @@ public class FragmentExecutor implements Runnable {
      * before this check. This caused a concurrent modification exception as the list of operator
      * stats is iterated over while collecting info, and added to while building the operator tree.
      */
-    if(state.get() != FragmentState.RUNNING_VALUE) {
+    if (fragmentState.get() != FragmentState.RUNNING) {
       return null;
     }
 
-    return AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null).build();
+    final FragmentStatus status = AbstractStatusReporter
+        .getBuilder(fragmentContext, FragmentState.RUNNING, null)
+        .build();
+    return status;
   }
 
+  /**
+   * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
+   */
   public void cancel() {
+    acceptExternalEvents.awaitUninterruptibly();
+
     /*
-     * Note that this can be called from threads *other* than the one running this runnable(), so
-     * we need to be careful about the state transitions that can result. We set the canceled flag,
-     * and this is checked in the run() loop, where action will be taken as soon as possible.
-     *
-     * If the run loop has already exited, because we've already either completed or failed the query,
-     * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+     * Note that this can be called from threads *other* than the one running this runnable(), so we need to be careful
+     * about the state transitions that can result. We set the cancel requested flag but the actual cancellation is
+     * managed by the run() loop.
      */
-    canceled = true;
+    updateState(FragmentState.CANCELLATION_REQUESTED);
   }
 
-  public void receivingFragmentFinished(FragmentHandle handle) {
-    cancel();
+  /**
+   * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
+   * called in the case that a limit query is executed.
+   *
+   * @param handle
+   *          The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+   */
+  public void receivingFragmentFinished(final FragmentHandle handle) {
+    acceptExternalEvents.awaitUninterruptibly();
     if (root != null) {
+      logger.info("Applying request for early sender termination for {} -> {}.",
+          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
       root.receivingFragmentFinished(handle);
+    } else {
+      logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
+          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
     }
   }
 
@@ -114,184 +139,204 @@ public class FragmentExecutor implements Runnable {
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
     final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
     final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
+    final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
 
     try {
-      final String newThreadName = String.format("%s:frag:%s:%s",
-          QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
-          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+
       myThread.setName(newThreadName);
 
       root = ImplCreator.getExec(fragmentContext, rootOperator);
+
       clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+      updateState(FragmentState.RUNNING);
+
+      acceptExternalEvents.countDown();
 
       logger.debug("Starting fragment runner. {}:{}",
           fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
-      if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
-        logger.warn("Unable to set fragment state to RUNNING.  Cancelled or failed?");
-        return;
-      }
 
       /*
-       * Run the query until root.next returns false OR cancel() changes the
-       * state.
-       * Note that we closeOutResources() here if we're done.  That's because
-       * this can also throw exceptions that we want to treat as failures of the
-       * request, even if the request did fine up until this point.  Any
-       * failures there will be caught in the catch clause below, which will be
-       * reported to the user.  If they were to come from the finally clause,
-       * the uncaught exception there will simply terminate this thread without
-       * alerting the user--the behavior then is to hang.
+       * Run the query until root.next returns false OR we no longer need to continue.
        */
-      while (state.get() == FragmentState.RUNNING_VALUE) {
-        if (canceled) {
-          logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
-          // Change state checked by main loop to terminate it (if not already done):
-          updateState(FragmentState.CANCELLED);
-
-          fragmentContext.cancel();
-
-          logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
-
-          /*
-           * The state will be altered because of the updateState(), which would cause
-           * us to fall out of the enclosing while loop; we just short-circuit that here
-           */
-          break;
-        }
-
-        if (!root.next()) {
-          if (fragmentContext.isFailed()) {
-            internalFail(fragmentContext.getFailureCause());
-            closeOutResources();
-          } else {
-            /*
-             * Close out resources before we report success. We do this so that we'll get an
-             * error if there's a problem cleaning up, even though the query execution portion
-             * succeeded.
-             */
-            closeOutResources();
-            updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
-          }
-          break;
-        }
+      while (shouldContinue() && root.next()) {
+        // loop
       }
+
+      updateState(FragmentState.FINISHED);
+
     } catch (AssertionError | Exception e) {
-      logger.warn("Error while initializing or executing fragment", e);
-      fragmentContext.fail(e);
-      internalFail(e);
+      fail(e);
     } finally {
-      clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
-      // Final check to make sure RecordBatches are cleaned up.
+      // We need to sure we countDown at least once. We'll do it here to guarantee that.
+      acceptExternalEvents.countDown();
+
       closeOutResources();
 
+      // send the final state of the fragment. only the main execution thread can send the final state and it can
+      // only be sent once.
+      sendFinalState();
+
+      clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
+
       myThread.setName(originalThreadName);
     }
   }
 
-  private static final String CLOSE_FAILURE = "Failure while closing out resources";
-
-  private void closeOutResources() {
-    /*
-     * Because of the way this method can be called, it needs to be idempotent; it must
-     * be safe to call it more than once. We use this flag to bypass the body if it has
-     * been called before.
-     */
-    synchronized(this) { // synchronize for the state of closed
-      if (closed) {
-        return;
-      }
+  /**
+   * Utility method to check where we are in a no terminal state.
+   *
+   * @return Whether or not execution should continue.
+   */
+  private boolean shouldContinue() {
+    return !isCompleted() && FragmentState.CANCELLATION_REQUESTED != fragmentState.get();
+  }
 
-      final DeferredException deferredException = fragmentContext.getDeferredException();
-      try {
-        root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
-      } catch (RuntimeException e) {
-        logger.warn(CLOSE_FAILURE, e);
-        deferredException.addException(e);
-      }
+  /**
+   * Returns true if the fragment is in a terminal state
+   *
+   * @return Whether this state is in a terminal state.
+   */
+  private boolean isCompleted() {
+    return isTerminal(fragmentState.get());
+  }
 
-      closed = true;
+  private void sendFinalState() {
+    final FragmentState outcome = fragmentState.get();
+    if (outcome == FragmentState.FAILED) {
+      final FragmentHandle handle = getContext().getHandle();
+      final UserException uex = UserException.systemError(deferredException.getAndClear())
+          .addIdentity(getContext().getIdentity())
+          .addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId())
+          .build();
+      listener.fail(fragmentContext.getHandle(), uex);
+    } else {
+      listener.stateChanged(fragmentContext.getHandle(), outcome);
     }
+  }
+
+
+  private void closeOutResources() {
 
-    /*
-     * This must be last, because this may throw deferred exceptions.
-     * We are forced to wrap the checked exception (if any) so that it will be unchecked.
-     */
     try {
-      fragmentContext.close();
-    } catch(Exception e) {
-      throw new RuntimeException("Error closing fragment context.", e);
+      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+    } catch (final Exception e) {
+      fail(e);
     }
-  }
 
-  private void internalFail(final Throwable excep) {
-    state.set(FragmentState.FAILED_VALUE);
+    fragmentContext.close();
 
-    UserException uex = UserException.systemError(excep).addIdentity(getContext().getIdentity()).build();
-    listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", uex);
   }
 
-  /**
-   * Updates the fragment state with the given state
-   *
-   * @param  to  target state
-   */
-  private void updateState(final FragmentState to) {
-    state.set(to.getNumber());
-    listener.stateChanged(fragmentContext.getHandle(), to);
+  private void warnStateChange(final FragmentState current, final FragmentState target) {
+    logger.warn("Ignoring unexpected state transition {} => {}.", current.name(), target.name());
   }
 
-  /**
-   * Updates the fragment state only iff the current state matches the expected.
-   *
-   * @param  expected  expected current state
-   * @param  to  target state
-   * @return true only if update succeeds
-   */
-  private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
-    final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
-    if (success) {
-      listener.stateChanged(fragmentContext.getHandle(), to);
-    } else {
-      logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
-          expected.name(), to.name(), FragmentState.valueOf(state.get()));
+  private void errorStateChange(final FragmentState current, final FragmentState target) {
+    final String msg = "Invalid state transition %s => %s.";
+    throw new StateTransitionException(String.format(msg, current.name(), target.name()));
+  }
+
+  private synchronized boolean updateState(FragmentState target) {
+    final FragmentHandle handle = fragmentContext.getHandle();
+    final FragmentState current = fragmentState.get();
+    logger.info(fragmentName + ": State change requested from {} --> {} for ", current, target);
+    switch (target) {
+    case CANCELLATION_REQUESTED:
+      switch (current) {
+      case SENDING:
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+        fragmentState.set(target);
+        listener.stateChanged(handle, target);
+        return true;
+
+      default:
+        warnStateChange(current, target);
+        return false;
+      }
+
+    case FINISHED:
+      if(current == FragmentState.CANCELLATION_REQUESTED){
+        target = FragmentState.CANCELLED;
+      }
+      // fall-through
+    case FAILED:
+      if(!isTerminal(current)){
+        fragmentState.set(target);
+        // don't notify listener until we finalize this terminal state.
+        return true;
+      } else if (current == FragmentState.FAILED) {
+        // no warn since we can call fail multiple times.
+        return false;
+      } else if (current == FragmentState.CANCELLED && target == FragmentState.FAILED) {
+        fragmentState.set(FragmentState.FAILED);
+        return true;
+      }else{
+        warnStateChange(current, target);
+        return false;
+      }
+
+    case RUNNING:
+      if(current == FragmentState.AWAITING_ALLOCATION){
+        fragmentState.set(target);
+        listener.stateChanged(handle, target);
+        return true;
+      }else{
+        errorStateChange(current, target);
+      }
+
+    // these should never be requested.
+    case SENDING:
+    case AWAITING_ALLOCATION:
+    case CANCELLED:
+    default:
+      errorStateChange(current, target);
     }
-    return success;
+
+    // errorStateChange() throw should mean this is never executed
+    throw new IllegalStateException();
   }
 
-  /**
-   * Returns true if the fragment is in a terminal state
-   */
-  private boolean isCompleted() {
-    return state.get() == FragmentState.CANCELLED_VALUE
-        || state.get() == FragmentState.FAILED_VALUE
-        || state.get() == FragmentState.FINISHED_VALUE;
+  private boolean isTerminal(final FragmentState state) {
+    return state == FragmentState.CANCELLED
+        || state == FragmentState.FAILED
+        || state == FragmentState.FINISHED;
   }
 
   /**
-   * Update the state if current state matches expected or fail the fragment if state transition fails even though
-   * fragment is not in a terminal state.
+   * Capture an exception and add store it. Update state to failed status (if not already there). Does not immediately
+   * report status back to Foreman. Only the original thread can return status to the Foreman.
    *
-   * @param expected  current expected state
-   * @param to  target state
-   * @return true only if update succeeds
+   * @param excep
+   *          The failure that occurred.
    */
-  private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
-    final boolean updated = checkAndUpdateState(expected, to);
-    if (!updated && !isCompleted()) {
-      final String msg = "State was different than expected while attempting to update state from %s to %s"
-          + "however current state was %s.";
-      internalFail(new StateTransitionException(
-          String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
-    }
-    return updated;
+  private void fail(final Throwable excep) {
+    deferredException.addThrowable(excep);
+    updateState(FragmentState.FAILED);
   }
 
   public FragmentContext getContext() {
     return fragmentContext;
   }
 
+  private class ExecutorStateImpl implements ExecutorState {
+    public boolean shouldContinue() {
+      return FragmentExecutor.this.shouldContinue();
+    }
+
+    public void fail(final Throwable t) {
+      FragmentExecutor.this.fail(t);
+    }
+
+    public boolean isFailed() {
+      return fragmentState.get() == FragmentState.FAILED;
+    }
+    public Throwable getFailureCause(){
+      return deferredException.getException();
+    }
+  }
+
   private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
     @Override
     public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 7a819c4..0ba91b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 
-import java.io.IOException;
-
 /**
  * The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
  * are avialable, a fragment manager will start a fragment executor to run the associated fragment.
@@ -57,6 +57,8 @@ public interface FragmentManager {
 
   public abstract void addConnection(RemoteConnection connection);
 
+  public void receivingFragmentFinished(final FragmentHandle handle);
+
   /**
    *  Sets autoRead property on all connections
    * @param autoRead

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 41e87cd..a5b928b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -33,19 +33,23 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.foreman.ForemanException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This managers determines when to run a non-root fragment node.
  */
 // TODO a lot of this is the same as RootFragmentManager
 public class NonRootFragmentManager implements FragmentManager {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
+
   private final PlanFragment fragment;
   private FragmentRoot root;
   private final IncomingBuffers buffers;
-  private final StatusReporter runnerListener;
-  private volatile FragmentExecutor runner;
+  private final FragmentExecutor runner;
   private volatile boolean cancel = false;
   private final FragmentContext context;
-  private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+  private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+  private volatile boolean runnerRetrieved = false;
 
   public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
       throws ExecutionSetupException {
@@ -54,8 +58,10 @@ public class NonRootFragmentManager implements FragmentManager {
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
+      final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
+          fragment.getForeman()));
+      this.runner = new FragmentExecutor(this.context, root, reporter);
       this.context.setBuffers(buffers);
-      this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
 
     } catch (ForemanException | IOException e) {
       throw new FragmentSetupException("Failure while decoding fragment.", e);
@@ -66,7 +72,7 @@ public class NonRootFragmentManager implements FragmentManager {
    * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
    */
   @Override
-  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException {
+  public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
     return buffers.batchArrived(batch);
   }
 
@@ -76,16 +82,22 @@ public class NonRootFragmentManager implements FragmentManager {
   @Override
   public FragmentExecutor getRunnable() {
     synchronized(this) {
-      if (runner != null) {
-        throw new IllegalStateException("Get Runnable can only be run once.");
-      }
+
+      // historically, we had issues where we tried to run the same fragment multiple times. Let's check to make sure
+      // this isn't happening.
+      Preconditions.checkArgument(!runnerRetrieved, "Get Runnable can only be run once.");
+
       if (cancel) {
         return null;
       }
-      runner = new FragmentExecutor(context, root, runnerListener);
+      runnerRetrieved = true;
       return runner;
     }
+  }
 
+  @Override
+  public void receivingFragmentFinished(final FragmentHandle handle) {
+    runner.receivingFragmentFinished(handle);
   }
 
   /* (non-Javadoc)
@@ -95,9 +107,7 @@ public class NonRootFragmentManager implements FragmentManager {
   public void cancel() {
     synchronized(this) {
       cancel = true;
-      if (runner != null) {
-        runner.cancel();
-      }
+      runner.cancel();
     }
   }
 
@@ -117,13 +127,13 @@ public class NonRootFragmentManager implements FragmentManager {
   }
 
   @Override
-  public void addConnection(RemoteConnection connection) {
+  public void addConnection(final RemoteConnection connection) {
     connections.add(connection);
   }
 
   @Override
-  public void setAutoRead(boolean autoRead) {
-    for (RemoteConnection c : connections) {
+  public void setAutoRead(final boolean autoRead) {
+    for (final RemoteConnection c : connections) {
       c.setAutoRead(autoRead);
     }
   }


[2/7] drill git commit: DRILL-2803: Fix compound hash function to use seed value for hash if additional value is null.

Posted by ja...@apache.org.
DRILL-2803: Fix compound hash function to use seed value for hash if additional value is null.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/071ed89b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/071ed89b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/071ed89b

Branch: refs/heads/master
Commit: 071ed89b10b3b5b49ac0286f4717d74431e1efb3
Parents: cf15546
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 15 19:45:56 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 09:11:12 2015 -0700

----------------------------------------------------------------------
 .../expr/fn/impl/Hash64FunctionsWithSeed.java   | 32 ++++++++++----------
 .../expr/fn/impl/Hash64WithSeedAsDouble.java    | 18 +++++------
 2 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/071ed89b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
index 3a1edd3..e162b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.expr.fn.impl;
 
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.Output;
 import org.apache.drill.exec.expr.annotations.Param;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -70,7 +70,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -106,7 +106,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -142,7 +142,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
       }
@@ -162,7 +162,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
       }
@@ -182,7 +182,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
       }
@@ -202,7 +202,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       }
       else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
@@ -222,7 +222,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       }
       else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
@@ -336,7 +336,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -370,7 +370,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -404,7 +404,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -438,7 +438,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -472,7 +472,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -506,7 +506,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.start + NullableDecimal28SparseHolder.WIDTH, in.buffer, seed.value);
       }
@@ -540,7 +540,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.start + NullableDecimal38SparseHolder.WIDTH, in.buffer, seed.value);
       }
@@ -560,7 +560,7 @@ public class Hash64FunctionsWithSeed {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/071ed89b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
index 0cbac1b..6922c31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.expr.fn.impl;
 
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.Output;
 import org.apache.drill.exec.expr.annotations.Param;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.Decimal18Holder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
@@ -61,7 +61,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
       }
@@ -97,7 +97,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
       }
@@ -133,7 +133,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       }
       else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
@@ -153,7 +153,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       }
       else {
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
@@ -221,7 +221,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         java.math.BigDecimal input = new java.math.BigDecimal(java.math.BigInteger.valueOf(in.value), in.scale);
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(input.doubleValue(), seed.value);
@@ -257,7 +257,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         java.math.BigDecimal input = new java.math.BigDecimal(java.math.BigInteger.valueOf(in.value), in.scale);
         out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(input.doubleValue(), seed.value);
@@ -294,7 +294,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         java.math.BigDecimal input = org.apache.drill.exec.util.DecimalUtility.getBigDecimalFromSparse(in.buffer,
             in.start, in.nDecimalDigits, in.scale);
@@ -332,7 +332,7 @@ public class Hash64WithSeedAsDouble {
 
     public void eval() {
       if (in.isSet == 0) {
-        out.value = 0;
+        out.value = seed.value;
       } else {
         java.math.BigDecimal input = org.apache.drill.exec.util.DecimalUtility.getBigDecimalFromSparse(in.buffer,
             in.start, in.nDecimalDigits, in.scale);


[7/7] drill git commit: DRILL-2825: Bump test memory to 3g due to sporadic OOM on TestLargeFileCompilation.

Posted by ja...@apache.org.
DRILL-2825: Bump test memory to 3g due to sporadic OOM on TestLargeFileCompilation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ec257ef
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ec257ef
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ec257ef

Branch: refs/heads/master
Commit: 9ec257efb7992209e27c82e6f4ee8a5b12cc95e4
Parents: c0d5a69
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Apr 18 09:19:48 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 18:01:45 2015 -0700

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9ec257ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f6bcd91..3b246d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -365,7 +365,7 @@
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.17</version>
           <configuration>
-            <argLine>-Xms512m -Xmx2g -Ddrill.exec.http.enabled=false
+            <argLine>-Xms512m -Xmx3g -Ddrill.exec.http.enabled=false
               -Ddrill.exec.sys.store.provider.local.write=false
               -Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
               -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=3072M


[6/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection

Posted by ja...@apache.org.
DRILL-2762: Update Fragment state reporting and error collection

DeferredException
- Add new throwAndClear operation on to allow checking for exceptions preClose in FragmentContext
- Add new getAndClear operation

BufferManager
- Ensure close() can be called multiple times by clearing managed buffer list on close().

FragmentContext/FragmentExecutor
- Update FragmentContext to have a preClose so that we can check closure state before doing final close.
- Update so that there is only a single state maintained between FragmentContext and FragmentExecutor
- Clean up FragmentExecutor run() method to better manage error states and have only single terminal point (avoiding multiple messages to Foreman).
- Add new CANCELLATION_REQUESTED state for FragmentState.
- Move all users of isCancelled or isFailed in main code to use shouldContinue()
- Update receivingFragmentFinished message to not cancel fragment (only inform root operator of cancellation)

WorkManager Updates
- Add new afterExecute command to the WorkManager ExecutorService so that we get log entries if a thread leaks an exception.  (Otherwise logs don't show these exceptions and they only go to standard out.)

Profile Page
- Update profile page to show last update and last progress.
- Change durations to non-time presentation

Foreman/QueryManager
- Extract listenable interfaces into anonymous inner classes from body of Foreman

QueryManager
- Update QueryManager to track completed nodes rather than completed fragments using NodeTracker
- Update DrillbitStatusListener to decrement expected completion messages on Nodes that have died to avoid query hang when a node dies

FragmentData/MinorFragmentProfile
- Add ability to track last status update as well as last time fragment made progress

AbstractRecordBatch
- Update awareness of current cancellation state to avoid cancellation delays

Misc. Other changes
- Move ByteCode optimization code to only record assembly and code as trace messages
- Update SimpleRootExec to create fake ExecutorState to make existing tests work.
- Update sort to exit prematurely in the case that the fragment was asked to cancel.
- Add finals to all edited files.
- Modify control handler and FragmentManager to directly support receivingFragmentFinished
- Update receiver propagation message to avoid premature removal of fragment manager
- Update UserException.Builder to log a message if we're creating a new UserException (ERROR for System, INFO otherwise).
- Update Profile pages to use min and max instead of sorts.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c0d5a693
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c0d5a693
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c0d5a693

Branch: refs/heads/master
Commit: c0d5a693ac70c42019b5841eea91252f9eaa7792
Parents: 071ed89
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 8 19:06:03 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 17:56:21 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/common/DeferredException.java  |  47 ++-
 .../drill/common/concurrent/ExtendedLatch.java  |  89 +++++
 .../drill/common/exceptions/UserException.java  |  66 ++--
 .../org/apache/drill/exec/compile/AsmUtil.java  |  28 +-
 .../compile/bytecode/InstructionModifier.java   |  28 +-
 .../compile/bytecode/ScalarReplacementNode.java |  10 +-
 .../apache/drill/exec/ops/BufferManager.java    |   1 +
 .../apache/drill/exec/ops/FragmentContext.java  | 143 ++++----
 .../drill/exec/physical/impl/BaseRootExec.java  |  10 +-
 .../impl/mergereceiver/MergingRecordBatch.java  | 143 ++++----
 .../impl/producer/ProducerConsumerBatch.java    |  48 ++-
 .../impl/project/ProjectRecordBatch.java        | 250 +++++++------
 .../UnorderedReceiverBatch.java                 |  39 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   4 +
 .../exec/physical/impl/xsort/MSortTemplate.java |  43 ++-
 .../drill/exec/proto/helper/QueryIdHelper.java  |  23 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  26 +-
 .../exec/record/FragmentWritableBatch.java      |  50 ++-
 .../exec/rpc/data/DataResponseHandlerImpl.java  |  11 +-
 .../org/apache/drill/exec/server/Drillbit.java  |  16 +-
 .../exec/server/rest/profile/Comparators.java   |  34 +-
 .../server/rest/profile/FragmentWrapper.java    |  73 ++--
 .../server/rest/profile/OperatorWrapper.java    |  26 +-
 .../server/rest/profile/ProfileWrapper.java     |  49 +--
 .../exec/server/rest/profile/TableBuilder.java  |  77 ++--
 .../org/apache/drill/exec/work/WorkManager.java |  34 +-
 .../exec/work/batch/ControlHandlerImpl.java     |  20 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  37 +-
 .../apache/drill/exec/work/foreman/Foreman.java | 127 +++----
 .../drill/exec/work/foreman/FragmentData.java   |  77 +++-
 .../drill/exec/work/foreman/QueryManager.java   | 292 ++++++++++-----
 .../work/fragment/AbstractStatusReporter.java   |  62 +---
 .../exec/work/fragment/FragmentExecutor.java    | 357 +++++++++++--------
 .../exec/work/fragment/FragmentManager.java     |   6 +-
 .../work/fragment/NonRootFragmentManager.java   |  40 ++-
 .../exec/work/fragment/RootFragmentManager.java |  17 +-
 .../work/fragment/StateTransitionException.java |  12 +-
 .../exec/work/fragment/StatusReporter.java      |   2 +-
 .../exec/physical/impl/SimpleRootExec.java      |  40 ++-
 .../work/batch/TestUnlimitedBatchBuffer.java    |   4 +-
 .../drill/exec/proto/SchemaUserBitShared.java   |  14 +
 .../apache/drill/exec/proto/UserBitShared.java  | 249 +++++++++++--
 .../drill/exec/proto/beans/FragmentState.java   |   4 +-
 .../exec/proto/beans/MinorFragmentProfile.java  |  44 +++
 protocol/src/main/protobuf/UserBitShared.proto  |   3 +
 45 files changed, 1718 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/DeferredException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/DeferredException.java b/common/src/main/java/org/apache/drill/common/DeferredException.java
index 99f18f1..c7111a9 100644
--- a/common/src/main/java/org/apache/drill/common/DeferredException.java
+++ b/common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -71,9 +71,32 @@ public class DeferredException implements AutoCloseable {
    *
    * @return the deferred exception, or null
    */
-  public Exception getException() {
-    synchronized(this) {
-      return exception;
+  public synchronized Exception getException() {
+    return exception;
+  }
+
+  public synchronized Exception getAndClear() {
+    Preconditions.checkState(!isClosed);
+
+    if (exception != null) {
+      final Exception local = exception;
+      exception = null;
+      return local;
+    }
+
+    return null;
+  }
+
+  /**
+   * If an exception exists, will throw the exception and then clear it. This is so in cases where want to reuse
+   * DeferredException, we don't double report the same exception.
+   *
+   * @throws Exception
+   */
+  public synchronized void throwAndClear() throws Exception{
+    final Exception e = getAndClear();
+    if (e != null) {
+      throw e;
     }
   }
 
@@ -98,24 +121,18 @@ public class DeferredException implements AutoCloseable {
 
       try {
         autoCloseable.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         addException(e);
       }
     }
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized(this) {
-      Preconditions.checkState(!isClosed);
-
-      try {
-        if (exception != null) {
-          throw exception;
-        }
-      } finally {
-        isClosed = true;
-      }
+  public synchronized void close() throws Exception {
+    try {
+      throwAndClear();
+    } finally {
+      isClosed = true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
new file mode 100644
index 0000000..a75ac32
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An extended CountDownLatch which allows us to await uninterruptibly.
+ */
+public class ExtendedLatch extends CountDownLatch {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExtendedLatch.class);
+
+  public ExtendedLatch() {
+    super(1);
+  }
+
+  public ExtendedLatch(final int count) {
+    super(count);
+  }
+
+  /**
+   * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
+   * state variable or similar.
+   *
+   * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
+   */
+  protected boolean ignoreInterruptions() {
+    return true;
+  }
+
+  /**
+   * Await without interruption for a given time.
+   * @param waitMillis
+   *          Time in milliseconds to wait
+   * @return Whether the countdown reached zero or not.
+   */
+  public boolean awaitUninterruptibly(long waitMillis) {
+    final long targetMillis = System.currentTimeMillis() + waitMillis;
+    while (System.currentTimeMillis() < targetMillis) {
+      final long wait = targetMillis - System.currentTimeMillis();
+      if (wait < 1) {
+        return false;
+      }
+
+      try {
+        return await(wait, TimeUnit.MILLISECONDS);
+      } catch (final InterruptedException e) {
+        // if we weren't ready, the while loop will continue to wait
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
+   * output of ignoreInterruptions();
+   */
+  public void awaitUninterruptibly() {
+    while (true) {
+      try {
+        await();
+        return;
+      } catch (final InterruptedException e) {
+        if (ignoreInterruptions()) {
+          // if we're still not ready, the while loop will cause us to wait again
+          logger.warn("Interrupted while waiting for event latch.", e);
+        } else {
+          return;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index e995346..9283339 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -53,7 +53,7 @@ public class UserException extends DrillRuntimeException {
    *             Rpc layer or UserResultListener.submitFailed()
    */
   @Deprecated
-  public static Builder systemError(Throwable cause) {
+  public static Builder systemError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.SYSTEM, cause);
   }
 
@@ -79,7 +79,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder connectionError(Throwable cause) {
+  public static Builder connectionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.CONNECTION, cause);
   }
 
@@ -105,7 +105,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder dataReadError(Throwable cause) {
+  public static Builder dataReadError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.DATA_READ, cause);
   }
 
@@ -131,7 +131,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder dataWriteError(Throwable cause) {
+  public static Builder dataWriteError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.DATA_WRITE, cause);
   }
 
@@ -157,7 +157,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder functionError(Throwable cause) {
+  public static Builder functionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.FUNCTION, cause);
   }
 
@@ -183,7 +183,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder parseError(Throwable cause) {
+  public static Builder parseError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PARSE, cause);
   }
 
@@ -209,7 +209,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder permissionError(Throwable cause) {
+  public static Builder permissionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PERMISSION, cause);
   }
 
@@ -235,7 +235,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder planError(Throwable cause) {
+  public static Builder planError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PLAN, cause);
   }
 
@@ -261,7 +261,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder resourceError(Throwable cause) {
+  public static Builder resourceError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.RESOURCE, cause);
   }
 
@@ -287,7 +287,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder unsupportedError(Throwable cause) {
+  public static Builder unsupportedError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.UNSUPPORTED_OPERATION, cause);
   }
 
@@ -313,7 +313,7 @@ public class UserException extends DrillRuntimeException {
      *                  or doesn't wrap a user exception
      * @param cause exception to wrap inside a user exception. Can be null
      */
-    private Builder(DrillPBError.ErrorType errorType, Throwable cause) {
+    private Builder(final DrillPBError.ErrorType errorType, final Throwable cause) {
       this.cause = cause;
 
       //TODO handle the improbable case where cause is a SYSTEM exception ?
@@ -339,7 +339,7 @@ public class UserException extends DrillRuntimeException {
      * @param args Arguments referenced by the format specifiers in the format string
      * @return this builder
      */
-    public Builder message(String format, Object... args) {
+    public Builder message(final String format, final Object... args) {
       // we can't replace the message of a user exception
       if (uex == null && format != null) {
         this.message = String.format(format, args);
@@ -353,7 +353,7 @@ public class UserException extends DrillRuntimeException {
      *
      * @param endpoint drillbit endpoint identity
      */
-    public Builder addIdentity(CoordinationProtos.DrillbitEndpoint endpoint) {
+    public Builder addIdentity(final CoordinationProtos.DrillbitEndpoint endpoint) {
       context.add(endpoint);
       return this;
     }
@@ -363,7 +363,7 @@ public class UserException extends DrillRuntimeException {
      * @param value string line
      * @return this builder
      */
-    public Builder addContext(String value) {
+    public Builder addContext(final String value) {
       context.add(value);
       return this;
     }
@@ -375,7 +375,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, String value) {
+    public Builder addContext(final String name, final String value) {
       context.add(name, value);
       return this;
     }
@@ -387,7 +387,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, long value) {
+    public Builder addContext(final String name, final long value) {
       context.add(name, value);
       return this;
     }
@@ -399,7 +399,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, double value) {
+    public Builder addContext(final String name, final double value) {
       context.add(name, value);
       return this;
     }
@@ -410,7 +410,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String value) {
+    public Builder pushContext(final String value) {
       context.push(value);
       return this;
     }
@@ -422,7 +422,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, String value) {
+    public Builder pushContext(final String name, final String value) {
       context.push(name, value);
       return this;
     }
@@ -434,7 +434,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, long value) {
+    public Builder pushContext(final String name, final long value) {
       context.push(name, value);
       return this;
     }
@@ -446,7 +446,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, double value) {
+    public Builder pushContext(final String name, final double value) {
       context.push(name, value);
       return this;
     }
@@ -462,7 +462,19 @@ public class UserException extends DrillRuntimeException {
         return uex;
       }
 
-      return new UserException(this);
+      final UserException newException = new UserException(this);
+
+      // since we just created a new exception, we should log it for later reference. If this is a system error, this is
+      // an issue that the Drill admin should pay attention to and we should log as ERROR. However, if this is a user
+      // mistake or data read issue, the system admin should not be concerned about these and thus we'll log this
+      // as an INFO message.
+      if (errorType == DrillPBError.ErrorType.SYSTEM) {
+        logger.error(newException.getMessage(), newException);
+      } else {
+        logger.info("User Error Occurred", newException);
+      }
+
+      return newException;
     }
   }
 
@@ -470,14 +482,14 @@ public class UserException extends DrillRuntimeException {
 
   private final UserExceptionContext context;
 
-  protected UserException(DrillPBError.ErrorType errorType, String message, Throwable cause) {
+  protected UserException(final DrillPBError.ErrorType errorType, final String message, final Throwable cause) {
     super(message, cause);
 
     this.errorType = errorType;
     this.context = new UserExceptionContext();
   }
 
-  private UserException(Builder builder) {
+  private UserException(final Builder builder) {
     super(builder.message, builder.cause);
     this.errorType = builder.errorType;
     this.context = builder.context;
@@ -516,10 +528,10 @@ public class UserException extends DrillRuntimeException {
    * @param verbose should the error object contain the verbose error message ?
    * @return protobuf error object
    */
-  public DrillPBError getOrCreatePBError(boolean verbose) {
-    String message = verbose ? getVerboseMessage() : getMessage();
+  public DrillPBError getOrCreatePBError(final boolean verbose) {
+    final String message = verbose ? getVerboseMessage() : getMessage();
 
-    DrillPBError.Builder builder = DrillPBError.newBuilder();
+    final DrillPBError.Builder builder = DrillPBError.newBuilder();
     builder.setErrorType(errorType);
     builder.setErrorId(context.getErrorId());
     if (context.getEndpoint() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
index 81904df..5e7a9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
@@ -54,7 +54,7 @@ public class AsmUtil {
     final ClassReader ver = new ClassReader(verifyWriter.toByteArray());
     try {
       DrillCheckClassAdapter.verify(ver, false, new PrintWriter(sw));
-    } catch(Exception e) {
+    } catch(final Exception e) {
       logger.info("Caught exception verifying class:");
       logClass(logger, logTag, classNode);
       throw e;
@@ -98,19 +98,25 @@ public class AsmUtil {
   /**
    * Write a class to the log.
    *
-   * <p>Writes at level DEBUG.
+   * <p>
+   * Writes at level TRACE.
    *
-   * @param logger the logger to write to
-   * @param logTag a tag to print to the log
-   * @param classNode the class
+   * @param logger
+   *          the logger to write to
+   * @param logTag
+   *          a tag to print to the log
+   * @param classNode
+   *          the class
    */
   public static void logClass(final Logger logger, final String logTag, final ClassNode classNode) {
-    logger.debug(logTag);
-    final StringWriter stringWriter = new StringWriter();
-    final PrintWriter printWriter = new PrintWriter(stringWriter);
-    final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
-    classNode.accept(traceClassVisitor);
-    logger.debug(stringWriter.toString());
+    if (logger.isTraceEnabled()) {
+      logger.trace(logTag);
+      final StringWriter stringWriter = new StringWriter();
+      final PrintWriter printWriter = new PrintWriter(stringWriter);
+      final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
+      classNode.accept(traceClassVisitor);
+      logger.trace(stringWriter.toString());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
index 6c0292e..d25b1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
@@ -51,12 +51,6 @@ public class InstructionModifier extends MethodVisitor {
 
   private int stackIncrease = 0; // how much larger we have to make the stack
 
-  /*
-   * True if we've transferred holder members to locals. If so, then the maximum stack
-   * size must be increased by one to accommodate the extra DUP that is used to do so.
-   */
-  private boolean transferredToLocals = false;
-
   public InstructionModifier(final int access, final String name, final String desc,
       final String signature, final String[] exceptions, final TrackingInstructionList list,
       final MethodVisitor inner) {
@@ -116,7 +110,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitInsn(int opcode) {
+  public void visitInsn(final int opcode) {
     switch (opcode) {
     case Opcodes.DUP:
       /*
@@ -276,7 +270,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitTypeInsn(int opcode, String type) {
+  public void visitTypeInsn(final int opcode, final String type) {
     /*
      * This includes NEW, NEWARRAY, CHECKCAST, or INSTANCEOF.
      *
@@ -285,9 +279,9 @@ public class InstructionModifier extends MethodVisitor {
      * replaced the values for those, but we might find other reasons to replace
      * things, in which case this will be too broad.
      */
-    ReplacingBasicValue r = getFunctionReturn();
+    final ReplacingBasicValue r = getFunctionReturn();
     if (r != null) {
-      ValueHolderSub sub = r.getIden().getHolderSub(adder);
+      final ValueHolderSub sub = r.getIden().getHolderSub(adder);
       oldToNew.put(r.getIndex(), sub);
     } else {
       super.visitTypeInsn(opcode, type);
@@ -295,7 +289,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitLineNumber(int line, Label start) {
+  public void visitLineNumber(final int line, final Label start) {
     lastLineNumber = line;
     super.visitLineNumber(line, start);
   }
@@ -347,7 +341,7 @@ public class InstructionModifier extends MethodVisitor {
 
   @Override
   public void visitMaxs(final int maxStack, final int maxLocals) {
-    super.visitMaxs(maxStack + stackIncrease + (transferredToLocals ? 1 : 0), maxLocals);
+    super.visitMaxs(maxStack + stackIncrease, maxLocals);
   }
 
   @Override
@@ -397,7 +391,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+  public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc) {
     /*
      * This method was deprecated in the switch from api version ASM4 to ASM5.
      * If we ever go back (via CompilationConfig.ASM_API_VERSION), we need to
@@ -408,7 +402,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) {
+  public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc, final boolean itf) {
     // this version of visitMethodInsn() came after ASM4
     assert CompilationConfig.ASM_API_VERSION != Opcodes.ASM4;
 
@@ -471,7 +465,7 @@ public class InstructionModifier extends MethodVisitor {
 
   @Override
   public void visitEnd() {
-    if (logger.isDebugEnabled()) {
+    if (logger.isTraceEnabled()) {
       final StringBuilder sb = new StringBuilder();
       sb.append("InstructionModifier ");
       sb.append(name);
@@ -488,7 +482,7 @@ public class InstructionModifier extends MethodVisitor {
       int itemCount = 0; // counts up the number of items found
       final HashMap<ValueHolderIden, Integer> seenIdens = new HashMap<>(); // iden -> idenId
       sb.append(" .oldToNew:\n");
-      for (IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
+      for (final IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
         final ValueHolderIden iden = ioc.value.iden();
         if (!seenIdens.containsKey(iden)) {
           seenIdens.put(iden, ++idenId);
@@ -501,7 +495,7 @@ public class InstructionModifier extends MethodVisitor {
       }
 
       sb.append(" .oldLocalToFirst:\n");
-      for (IntIntCursor iic : oldLocalToFirst) {
+      for (final IntIntCursor iic : oldLocalToFirst) {
         sb.append("  " + iic.key + " => " + iic.value + '\n');
         ++itemCount;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
index 6e981bc..adbf2fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
@@ -60,14 +60,14 @@ public class ScalarReplacementNode extends MethodNode {
     Frame<BasicValue>[] frames;
     try {
       frames = analyzer.analyze(className, this);
-    } catch (AnalyzerException e) {
+    } catch (final AnalyzerException e) {
       throw new IllegalStateException(e);
     }
 
-    if (logger.isDebugEnabled()) {
+    if (logger.isTraceEnabled()) {
       final StringBuilder sb = new StringBuilder();
       sb.append("ReplacingBasicValues for " + className + "\n");
-      for(ReplacingBasicValue value : valueList) {
+      for(final ReplacingBasicValue value : valueList) {
         value.dump(sb, 2);
         sb.append('\n');
       }
@@ -75,14 +75,14 @@ public class ScalarReplacementNode extends MethodNode {
     }
 
     // wrap the instruction handler so that we can do additional things
-    TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
+    final TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
     this.instructions = list;
 
     MethodVisitor methodVisitor = inner;
     if (verifyBytecode) {
       methodVisitor = new CheckMethodVisitorFsm(CompilationConfig.ASM_API_VERSION, methodVisitor);
     }
-    InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
+    final InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
         this.signature, this.exceptionsArr, list, methodVisitor);
     accept(holderV);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 2d22d84..c953bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -58,6 +58,7 @@ public class BufferManager implements AutoCloseable {
         ((DrillBuf)mbuffers[i]).release();
       }
     }
+    managedBuffers.clear();
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 44ca78a..c46613d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.ops;
 
-import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -27,7 +26,6 @@ import java.util.Map;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
-import org.apache.drill.common.DeferredException;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -53,6 +51,8 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 /**
@@ -72,31 +72,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private IncomingBuffers buffers;
   private final OptionManager fragmentOptions;
   private final BufferManager bufferManager;
+  private ExecutorState executorState;
 
-  private final DeferredException deferredException = new DeferredException();
-  private volatile FragmentContextState state = FragmentContextState.OK;
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-
     @Override
-    public void accept(RpcException e) {
+    public void accept(final RpcException e) {
       fail(e);
     }
   };
+
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
   private final AccountingUserConnection accountingUserConnection;
 
-  /*
-   * TODO we need a state that indicates that cancellation has been requested and
-   * is in progress. Early termination (such as from limit queries) could also use
-   * this, as the cleanup steps should be exactly the same.
-   */
-  private static enum FragmentContextState {
-    OK,
-    FAILED,
-    CANCELED
-  }
-
   public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
       final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
     throws ExecutionSetupException {
@@ -111,14 +99,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
 
     try {
-      OptionList list;
+      final OptionList list;
       if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
         list = new OptionList();
       } else {
         list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
       }
       fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
 
@@ -127,8 +115,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     try {
       allocator = context.getAllocator().getChildAllocator(
           this, fragment.getMemInitial(), fragment.getMemMax(), true);
-      assert (allocator != null);
-    } catch(Throwable e) {
+      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+    } catch(final Throwable e) {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
     }
 
@@ -140,37 +128,29 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return fragmentOptions;
   }
 
-  public void setBuffers(IncomingBuffers buffers) {
+  public void setBuffers(final IncomingBuffers buffers) {
+    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
     this.buffers = buffers;
   }
 
-  public void fail(Throwable cause) {
-    final FragmentHandle fragmentHandle = fragment.getHandle();
-
-    UserException dse = UserException.systemError(cause).addIdentity(getIdentity()).build();
-
-    // log the error id
-    logger.error("Fragment Context received failure -- Fragment: {}:{}",
-      fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), dse);
-
-    setState(FragmentContextState.FAILED);
-    deferredException.addThrowable(dse);
+  public void setExecutorState(final ExecutorState executorState) {
+    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+    this.executorState = executorState;
   }
 
-  public void cancel() {
-    setState(FragmentContextState.CANCELED);
+  public void fail(final Throwable cause) {
+    executorState.fail(cause);
   }
 
   /**
-   * Allowed transitions from left to right: OK -> FAILED -> CANCELED
-   * @param newState
+   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
+   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
+   * often so that Drill is responsive to cancellation operations.
+   *
+   * @return false if the action should terminate immediately, true if everything is okay.
    */
-  private synchronized void setState(FragmentContextState newState) {
-    if (state == FragmentContextState.OK) {
-      state = newState;
-    } else if (newState == FragmentContextState.CANCELED) {
-      state = newState;
-    }
+  public boolean shouldContinue() {
+    return executorState.shouldContinue();
   }
 
   public DrillbitContext getDrillbitContext() {
@@ -224,6 +204,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return frag;
   }
 
+
+
   /**
    * Get this fragment's allocator.
    * @return the allocator
@@ -252,11 +234,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return context.getCompiler().getImplementationClass(cg);
   }
 
-  public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
     return getImplementationClass(cg.getCodeGenerator(), instanceCount);
   }
 
-  public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
     return context.getCompiler().getImplementationClass(cg, instanceCount);
   }
 
@@ -269,7 +251,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return context.getController().getTunnel(endpoint);
   }
 
-  public AccountingDataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
     AccountingDataTunnel tunnel = tunnels.get(endpoint);
     if (tunnel == null) {
       tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
@@ -282,16 +264,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return buffers;
   }
 
+  @VisibleForTesting
+  @Deprecated
   public Throwable getFailureCause() {
-    return deferredException.getException();
+    return executorState.getFailureCause();
   }
 
+  @VisibleForTesting
+  @Deprecated
   public boolean isFailed() {
-    return state == FragmentContextState.FAILED;
-  }
-
-  public boolean isCancelled() {
-    return state == FragmentContextState.CANCELED;
+    return executorState.isFailed();
   }
 
   public FunctionImplementationRegistry getFunctionRegistry() {
@@ -306,24 +288,25 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     allocator.setFragmentLimit(limit);
   }
 
-  public DeferredException getDeferredException() {
-    return deferredException;
-  }
-
   @Override
-  public void close() throws Exception {
-    /*
-     * TODO wait for threads working on this Fragment to terminate (or at least stop working
-     * on this Fragment's query)
-     */
-    deferredException.suppressingClose(bufferManager);
-    deferredException.suppressingClose(buffers);
-    deferredException.suppressingClose(allocator);
+  public void close() {
+    waitForSendComplete();
+    suppressingClose(bufferManager);
+    suppressingClose(buffers);
+    suppressingClose(allocator);
+  }
 
-    deferredException.close(); // must be last, as this may throw
+  private void suppressingClose(final AutoCloseable closeable) {
+    try {
+      if (closeable != null) {
+        closeable.close();
+      }
+    } catch (final Exception e) {
+      fail(e);
+    }
   }
 
-  public DrillBuf replace(DrillBuf old, int newSize) {
+  public DrillBuf replace(final DrillBuf old, final int newSize) {
     return bufferManager.replace(old, newSize);
   }
 
@@ -332,7 +315,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return bufferManager.getManagedBuffer();
   }
 
-  public DrillBuf getManagedBuffer(int size) {
+  public DrillBuf getManagedBuffer(final int size) {
     return bufferManager.getManagedBuffer(size);
   }
 
@@ -350,4 +333,30 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     sendingAccountor.waitForSendComplete();
   }
 
+  public interface ExecutorState {
+    /**
+     * Whether execution should continue.
+     *
+     * @return false if execution should stop.
+     */
+    public boolean shouldContinue();
+
+    /**
+     * Inform the executor if a exception occurs and fragment should be failed.
+     *
+     * @param t
+     *          The exception that occurred.
+     */
+    public void fail(final Throwable t);
+
+    @VisibleForTesting
+    @Deprecated
+    public boolean isFailed();
+
+    @VisibleForTesting
+    @Deprecated
+    public Throwable getFailureCause();
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 5b7ca66..628dcd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -34,7 +34,7 @@ public abstract class BaseRootExec implements RootExec {
   protected OperatorContext oContext = null;
   protected FragmentContext fragmentContext = null;
 
-  public BaseRootExec(FragmentContext fragmentContext, PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = new OperatorContext(config, fragmentContext, stats, true);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -43,7 +43,7 @@ public abstract class BaseRootExec implements RootExec {
     this.fragmentContext = fragmentContext;
   }
 
-  public BaseRootExec(FragmentContext fragmentContext, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
       config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -56,7 +56,7 @@ public abstract class BaseRootExec implements RootExec {
   public final boolean next() {
     // Stats should have been initialized
     assert stats != null;
-    if (fragmentContext.isFailed()) {
+    if (!fragmentContext.shouldContinue()) {
       return false;
     }
     try {
@@ -67,7 +67,7 @@ public abstract class BaseRootExec implements RootExec {
     }
   }
 
-  public final IterOutcome next(RecordBatch b){
+  public final IterOutcome next(final RecordBatch b){
     stats.stopProcessing();
     IterOutcome next;
     try {
@@ -90,7 +90,7 @@ public abstract class BaseRootExec implements RootExec {
   public abstract boolean innerNext();
 
   @Override
-  public void receivingFragmentFinished(FragmentHandle handle) {
+  public void receivingFragmentFinished(final FragmentHandle handle) {
     logger.warn("Currently not handling FinishedFragment message");
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e230fd2..b8ef690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -77,7 +76,6 @@ import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
 
 import parquet.Preconditions;
 
@@ -97,12 +95,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
   private RecordBatchLoader[] batchLoaders;
-  private RawFragmentBatchProvider[] fragProviders;
-  private FragmentContext context;
+  private final RawFragmentBatchProvider[] fragProviders;
+  private final FragmentContext context;
   private BatchSchema schema;
   private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
-  private MergingReceiverPOP config;
+  private final MergingReceiverPOP config;
   private boolean hasRun = false;
   private boolean prevBatchWasFull = false;
   private boolean hasMoreIncoming = true;
@@ -126,9 +124,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
-  public MergingRecordBatch(FragmentContext context,
-                            MergingReceiverPOP config,
-                            RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
+  public MergingRecordBatch(final FragmentContext context,
+                            final MergingReceiverPOP config,
+                            final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
     super(config, context, true, new OperatorContext(config, context, false));
     //super(config, context);
     this.fragProviders = fragProviders;
@@ -138,10 +136,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.config = config;
   }
 
-  private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+  private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
     stats.startWait();
     try {
-      RawFragmentBatch b = provider.getNext();
+      final RawFragmentBatch b = provider.getNext();
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
         stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
@@ -177,9 +175,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       schemaChanged = true; // first iteration is always a schema change
 
       // set up each (non-empty) incoming record batch
-      List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+      final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
       int p = 0;
-      for (RawFragmentBatchProvider provider : fragProviders) {
+      for (final RawFragmentBatchProvider provider : fragProviders) {
         RawFragmentBatch rawBatch = null;
         try {
           // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
@@ -190,10 +188,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatch = getNext(provider);
           }
           p++;
-          if (rawBatch == null && context.isCancelled()) {
+          if (rawBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
@@ -208,10 +206,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
               ;
             }
-            if (rawBatch == null && context.isCancelled()) {
+            if (rawBatch == null && !context.shouldContinue()) {
               return IterOutcome.STOP;
             }
-          } catch (IOException e) {
+          } catch (final IOException e) {
             context.fail(e);
             return IterOutcome.STOP;
           }
@@ -234,12 +232,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       int i = 0;
-      for (RawFragmentBatch batch : incomingBatches) {
+      for (final RawFragmentBatch batch : incomingBatches) {
         // initialize the incoming batchLoaders
-        UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+        final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
         try {
           batchLoaders[i].load(rbd, batch.getBody());
-        } catch(SchemaChangeException e) {
+        } catch(final SchemaChangeException e) {
           logger.error("MergingReceiver failed to load record batch from remote host.  {}", e);
           context.fail(e);
           return IterOutcome.STOP;
@@ -250,7 +248,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
-      for (RecordBatchLoader loader : batchLoaders) {
+      for (final RecordBatchLoader loader : batchLoaders) {
         loader.canonicalize();
       }
 
@@ -262,15 +260,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       // create the outgoing schema and vector container, and allocate the initial batch
-      SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+      final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
       int vectorCount = 0;
-      for (VectorWrapper<?> v : batchLoaders[0]) {
+      for (final VectorWrapper<?> v : batchLoaders[0]) {
 
         // add field to the output schema
         bldr.addField(v.getField());
 
         // allocate a new value vector
-        ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
+        final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
         ++vectorCount;
       }
       allocateOutgoing();
@@ -286,7 +284,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       // generate code for merge operations (copy and compare)
       try {
         merger = createMerger();
-      } catch (SchemaChangeException e) {
+      } catch (final SchemaChangeException e) {
         logger.error("Failed to generate code for MergingReceiver.  {}", e);
         context.fail(e);
         return IterOutcome.STOP;
@@ -294,9 +292,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // allocate the priority queue with the generated comparator
       this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
-        public int compare(Node node1, Node node2) {
-          int leftIndex = (node1.batchId << 16) + node1.valueIndex;
-          int rightIndex = (node2.batchId << 16) + node2.valueIndex;
+        public int compare(final Node node1, final Node node2) {
+          final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
+          final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
           return merger.doEval(leftIndex, rightIndex);
         }
       });
@@ -305,14 +303,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       for (int b = 0; b < senderCount; ++b) {
         while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
           try {
-            RawFragmentBatch batch = getNext(fragProviders[b]);
+            final RawFragmentBatch batch = getNext(fragProviders[b]);
             incomingBatches[b] = batch;
             if (batch != null) {
               batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
-              if (context.isCancelled()) {
+              if (!context.shouldContinue()) {
                 return IterOutcome.STOP;
               }
             }
@@ -332,7 +330,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
     while (!pqueue.isEmpty()) {
       // pop next value from pq and copy to outgoing batch
-      Node node = pqueue.peek();
+      final Node node = pqueue.peek();
       if (!copyRecordToOutgoingBatch(node)) {
         logger.debug("Outgoing vectors space is full; breaking");
         prevBatchWasFull = true;
@@ -355,10 +353,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
             nextBatch = getNext(fragProviders[node.batchId]);
           }
-          if (nextBatch == null && context.isCancelled()) {
+          if (nextBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
@@ -369,7 +367,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           // batch is empty
           boolean allBatchesEmpty = true;
 
-          for (RawFragmentBatch batch : incomingBatches) {
+          for (final RawFragmentBatch batch : incomingBatches) {
             // see if all batches are empty so we can return OK_* or NONE
             if (batch != null) {
               allBatchesEmpty = false;
@@ -387,10 +385,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           continue;
         }
 
-        UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
+        final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
         try {
           batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
-        } catch(SchemaChangeException ex) {
+        } catch(final SchemaChangeException ex) {
           context.fail(ex);
           return IterOutcome.STOP;
         }
@@ -412,7 +410,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
 
     // set the value counts in the outgoing vectors
-    for (VectorWrapper vw : outgoingContainer) {
+    for (final VectorWrapper vw : outgoingContainer) {
       vw.getValueVector().getMutator().setValueCount(outgoingPosition);
     }
 
@@ -448,19 +446,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           state = BatchState.DONE;
           return;
         }
-        RawFragmentBatch batch = getNext(fragProviders[i]);
+        final RawFragmentBatch batch = getNext(fragProviders[i]);
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
           continue;
         }
         tempBatchHolder[i] = batch;
-        for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
-          ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+        for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+          final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
           v.allocateNew();
         }
         break;
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
     outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
@@ -473,27 +471,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
     } else {
       cleanup();
-      for (RawFragmentBatchProvider provider : fragProviders) {
+      for (final RawFragmentBatchProvider provider : fragProviders) {
         provider.kill(context);
       }
     }
   }
 
   private void informSenders() {
-    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+    logger.info("Informing senders of request to terminate sending.");
+    final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
-      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+    for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+      final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
               .setMinorFragmentId(providingEndpoint.getId())
               .build();
-      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+      final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
@@ -504,18 +503,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
-    public void failed(RpcException ex) {
+    public void failed(final RpcException ex) {
       logger.warn("Failed to inform upstream that receiver is finished");
     }
 
     @Override
-    public void success(Ack value, ByteBuf buffer) {
+    public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     //No op
   }
 
@@ -535,12 +534,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return outgoingContainer.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return outgoingContainer.getValueAccessorById(clazz, ids);
   }
 
@@ -549,10 +548,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     return WritableBatch.get(this);
   }
 
-  private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
+  private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
     Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
 
-    BatchSchema schema = batchLoaders[0].getSchema();
+    final BatchSchema schema = batchLoaders[0].getSchema();
 
     for (int i = 1; i < batchLoaders.length; i++) {
       if (!schema.equals(batchLoaders[i].getSchema())) {
@@ -564,8 +563,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   private void allocateOutgoing() {
-    for (VectorWrapper w : outgoingContainer) {
-      ValueVector v = w.getValueVector();
+    for (final VectorWrapper w : outgoingContainer) {
+      final ValueVector v = w.getValueVector();
       if (v instanceof FixedWidthVector) {
         AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
       } else {
@@ -587,12 +586,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
 
     try {
-      CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-      ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
+      final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+      final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
 
       ExpandableHyperContainer batch = null;
       boolean first = true;
-      for (RecordBatchLoader loader : batchLoaders) {
+      for (final RecordBatchLoader loader : batchLoaders) {
         if (first) {
           batch = new ExpandableHyperContainer(loader);
           first = false;
@@ -606,7 +605,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       g.setMappingSet(COPIER_MAPPING_SET);
       CopyUtil.generateCopies(g, batch, true);
       g.setMappingSet(MAIN_MAPPING);
-      MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
+      final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
 
       merger.doSetup(context, batch, outgoingContainer);
       return merger;
@@ -621,28 +620,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
   public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
 
-  private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+  private void generateComparisons(final ClassGenerator g, final VectorAccessible batch) throws SchemaChangeException {
     g.setMappingSet(MAIN_MAPPING);
 
-    for (Ordering od : popConfig.getOrderings()) {
+    for (final Ordering od : popConfig.getOrderings()) {
       // first, we rewrite the evaluation stack for each side of the comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
+      final ErrorCollector collector = new ErrorCollectorImpl();
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
       g.setMappingSet(LEFT_MAPPING);
-      HoldingContainer left = g.addExpr(expr, false);
+      final HoldingContainer left = g.addExpr(expr, false);
       g.setMappingSet(RIGHT_MAPPING);
-      HoldingContainer right = g.addExpr(expr, false);
+      final HoldingContainer right = g.addExpr(expr, false);
       g.setMappingSet(MAIN_MAPPING);
 
       // next we wrap the two comparison sides and add the expression block for the comparison.
-      LogicalExpression fh =
+      final LogicalExpression fh =
           FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
                                                          context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, false);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      final HoldingContainer out = g.addExpr(fh, false);
+      final JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 
       if (od.getDirection() == Direction.ASCENDING) {
         jc._then()._return(out.getValue());
@@ -660,8 +659,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
    *
    * @param node Reference to the next record to copy from the incoming batches
    */
-  private boolean copyRecordToOutgoingBatch(Node node) {
-    int inIndex = (node.batchId << 16) + node.valueIndex;
+  private boolean copyRecordToOutgoingBatch(final Node node) {
+    final int inIndex = (node.batchId << 16) + node.valueIndex;
     merger.doCopy(inIndex, outgoingPosition);
     outgoingPosition++;
     if (outgoingPosition == OUTGOING_BATCH_SIZE) {
@@ -677,7 +676,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public class Node {
     public int batchId;      // incoming batch
     public int valueIndex;   // value within the batch
-    Node(int batchId, int valueIndex) {
+    Node(final int batchId, final int valueIndex) {
       this.batchId = batchId;
       this.valueIndex = valueIndex;
     }
@@ -687,7 +686,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public void cleanup() {
     outgoingContainer.clear();
     if (batchLoaders != null) {
-      for (RecordBatchLoader rbl : batchLoaders) {
+      for (final RecordBatchLoader rbl : batchLoaders) {
         if (rbl != null) {
           rbl.clear();
         }
@@ -695,7 +694,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
     oContext.close();
     if (fragProviders != null) {
-      for (RawFragmentBatchProvider f : fragProviders) {
+      for (final RawFragmentBatchProvider f : fragProviders) {
         f.cleanup();
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c50cb8a..c2d6166 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -20,10 +20,8 @@ package org.apache.drill.exec.physical.impl.producer;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -42,16 +40,16 @@ import org.apache.drill.exec.vector.ValueVector;
 public class ProducerConsumerBatch extends AbstractRecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
 
-  private RecordBatch incoming;
-  private Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
+  private final RecordBatch incoming;
+  private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
   private boolean running = false;
-  private BlockingDeque<RecordBatchDataWrapper> queue;
+  private final BlockingDeque<RecordBatchDataWrapper> queue;
   private int recordCount;
   private BatchSchema schema;
   private boolean stop = false;
   private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
 
-  protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
     this.queue = new LinkedBlockingDeque<>(popConfig.getSize());
@@ -68,8 +66,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       stats.startWait();
       wrapper = queue.take();
       logger.debug("Got batch from queue");
-    } catch (InterruptedException e) {
-      if (!(context.isCancelled() || context.isFailed())) {
+    } catch (final InterruptedException e) {
+      if (!context.shouldContinue()) {
         context.fail(e);
       }
       return IterOutcome.STOP;
@@ -84,30 +82,30 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     }
 
     recordCount = wrapper.batch.getRecordCount();
-    boolean newSchema = load(wrapper.batch);
+    final boolean newSchema = load(wrapper.batch);
 
     return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
   }
 
-  private boolean load(RecordBatchData batch) {
-    VectorContainer newContainer = batch.getContainer();
+  private boolean load(final RecordBatchData batch) {
+    final VectorContainer newContainer = batch.getContainer();
     if (schema != null && newContainer.getSchema().equals(schema)) {
       container.zeroVectors();
-      BatchSchema schema = container.getSchema();
+      final BatchSchema schema = container.getSchema();
       for (int i = 0; i < container.getNumberOfColumns(); i++) {
-        MaterializedField field = schema.getColumn(i);
-        MajorType type = field.getType();
-        ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+        final MaterializedField field = schema.getColumn(i);
+        final MajorType type = field.getType();
+        final ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
                 container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
-        ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+        final ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
                 newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
-        TransferPair tp = vIn.makeTransferPair(vOut);
+        final TransferPair tp = vIn.makeTransferPair(vOut);
         tp.transfer();
       }
       return false;
     } else {
       container.clear();
-      for (VectorWrapper w : newContainer) {
+      for (final VectorWrapper w : newContainer) {
         container.add(w.getValueVector());
       }
       container.buildSchema(SelectionVectorMode.NONE);
@@ -128,7 +126,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
         }
         outer:
         while (true) {
-          IterOutcome upstream = incoming.next();
+          final IterOutcome upstream = incoming.next();
           switch (upstream) {
             case NONE:
               stop = true;
@@ -146,7 +144,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
               throw new UnsupportedOperationException();
           }
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
         // TODO InterruptedException
       } finally {
@@ -154,7 +152,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
           try {
             clearQueue();
             queue.put(new RecordBatchDataWrapper(null, true, false));
-          } catch (InterruptedException e) {
+          } catch (final InterruptedException e) {
             logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
             // TODO InterruptedException
           }
@@ -177,12 +175,12 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     stop = true;
     producer.interrupt();
     try {
       producer.join();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer thread");
       // TODO InterruptedException
     }
@@ -193,7 +191,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     stop = true;
     try {
       cleanUpLatch.await();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
       // TODO InterruptedException
     } finally {
@@ -213,7 +211,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     boolean finished;
     boolean failed;
 
-    RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean failed) {
+    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
       this.batch = batch;
       this.finished = finished;
       this.failed = failed;