You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/19 03:03:41 UTC

[6/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection

DRILL-2762: Update Fragment state reporting and error collection

DeferredException
- Add new throwAndClear operation on to allow checking for exceptions preClose in FragmentContext
- Add new getAndClear operation

BufferManager
- Ensure close() can be called multiple times by clearing managed buffer list on close().

FragmentContext/FragmentExecutor
- Update FragmentContext to have a preClose so that we can check closure state before doing final close.
- Update so that there is only a single state maintained between FragmentContext and FragmentExecutor
- Clean up FragmentExecutor run() method to better manage error states and have only single terminal point (avoiding multiple messages to Foreman).
- Add new CANCELLATION_REQUESTED state for FragmentState.
- Move all users of isCancelled or isFailed in main code to use shouldContinue()
- Update receivingFragmentFinished message to not cancel fragment (only inform root operator of cancellation)

WorkManager Updates
- Add new afterExecute command to the WorkManager ExecutorService so that we get log entries if a thread leaks an exception.  (Otherwise logs don't show these exceptions and they only go to standard out.)

Profile Page
- Update profile page to show last update and last progress.
- Change durations to non-time presentation

Foreman/QueryManager
- Extract listenable interfaces into anonymous inner classes from body of Foreman

QueryManager
- Update QueryManager to track completed nodes rather than completed fragments using NodeTracker
- Update DrillbitStatusListener to decrement expected completion messages on Nodes that have died to avoid query hang when a node dies

FragmentData/MinorFragmentProfile
- Add ability to track last status update as well as last time fragment made progress

AbstractRecordBatch
- Update awareness of current cancellation state to avoid cancellation delays

Misc. Other changes
- Move ByteCode optimization code to only record assembly and code as trace messages
- Update SimpleRootExec to create fake ExecutorState to make existing tests work.
- Update sort to exit prematurely in the case that the fragment was asked to cancel.
- Add finals to all edited files.
- Modify control handler and FragmentManager to directly support receivingFragmentFinished
- Update receiver propagation message to avoid premature removal of fragment manager
- Update UserException.Builder to log a message if we're creating a new UserException (ERROR for System, INFO otherwise).
- Update Profile pages to use min and max instead of sorts.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c0d5a693
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c0d5a693
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c0d5a693

Branch: refs/heads/master
Commit: c0d5a693ac70c42019b5841eea91252f9eaa7792
Parents: 071ed89
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 8 19:06:03 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 17:56:21 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/common/DeferredException.java  |  47 ++-
 .../drill/common/concurrent/ExtendedLatch.java  |  89 +++++
 .../drill/common/exceptions/UserException.java  |  66 ++--
 .../org/apache/drill/exec/compile/AsmUtil.java  |  28 +-
 .../compile/bytecode/InstructionModifier.java   |  28 +-
 .../compile/bytecode/ScalarReplacementNode.java |  10 +-
 .../apache/drill/exec/ops/BufferManager.java    |   1 +
 .../apache/drill/exec/ops/FragmentContext.java  | 143 ++++----
 .../drill/exec/physical/impl/BaseRootExec.java  |  10 +-
 .../impl/mergereceiver/MergingRecordBatch.java  | 143 ++++----
 .../impl/producer/ProducerConsumerBatch.java    |  48 ++-
 .../impl/project/ProjectRecordBatch.java        | 250 +++++++------
 .../UnorderedReceiverBatch.java                 |  39 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   4 +
 .../exec/physical/impl/xsort/MSortTemplate.java |  43 ++-
 .../drill/exec/proto/helper/QueryIdHelper.java  |  23 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  26 +-
 .../exec/record/FragmentWritableBatch.java      |  50 ++-
 .../exec/rpc/data/DataResponseHandlerImpl.java  |  11 +-
 .../org/apache/drill/exec/server/Drillbit.java  |  16 +-
 .../exec/server/rest/profile/Comparators.java   |  34 +-
 .../server/rest/profile/FragmentWrapper.java    |  73 ++--
 .../server/rest/profile/OperatorWrapper.java    |  26 +-
 .../server/rest/profile/ProfileWrapper.java     |  49 +--
 .../exec/server/rest/profile/TableBuilder.java  |  77 ++--
 .../org/apache/drill/exec/work/WorkManager.java |  34 +-
 .../exec/work/batch/ControlHandlerImpl.java     |  20 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  37 +-
 .../apache/drill/exec/work/foreman/Foreman.java | 127 +++----
 .../drill/exec/work/foreman/FragmentData.java   |  77 +++-
 .../drill/exec/work/foreman/QueryManager.java   | 292 ++++++++++-----
 .../work/fragment/AbstractStatusReporter.java   |  62 +---
 .../exec/work/fragment/FragmentExecutor.java    | 357 +++++++++++--------
 .../exec/work/fragment/FragmentManager.java     |   6 +-
 .../work/fragment/NonRootFragmentManager.java   |  40 ++-
 .../exec/work/fragment/RootFragmentManager.java |  17 +-
 .../work/fragment/StateTransitionException.java |  12 +-
 .../exec/work/fragment/StatusReporter.java      |   2 +-
 .../exec/physical/impl/SimpleRootExec.java      |  40 ++-
 .../work/batch/TestUnlimitedBatchBuffer.java    |   4 +-
 .../drill/exec/proto/SchemaUserBitShared.java   |  14 +
 .../apache/drill/exec/proto/UserBitShared.java  | 249 +++++++++++--
 .../drill/exec/proto/beans/FragmentState.java   |   4 +-
 .../exec/proto/beans/MinorFragmentProfile.java  |  44 +++
 protocol/src/main/protobuf/UserBitShared.proto  |   3 +
 45 files changed, 1718 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/DeferredException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/DeferredException.java b/common/src/main/java/org/apache/drill/common/DeferredException.java
index 99f18f1..c7111a9 100644
--- a/common/src/main/java/org/apache/drill/common/DeferredException.java
+++ b/common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -71,9 +71,32 @@ public class DeferredException implements AutoCloseable {
    *
    * @return the deferred exception, or null
    */
-  public Exception getException() {
-    synchronized(this) {
-      return exception;
+  public synchronized Exception getException() {
+    return exception;
+  }
+
+  public synchronized Exception getAndClear() {
+    Preconditions.checkState(!isClosed);
+
+    if (exception != null) {
+      final Exception local = exception;
+      exception = null;
+      return local;
+    }
+
+    return null;
+  }
+
+  /**
+   * If an exception exists, will throw the exception and then clear it. This is so in cases where want to reuse
+   * DeferredException, we don't double report the same exception.
+   *
+   * @throws Exception
+   */
+  public synchronized void throwAndClear() throws Exception{
+    final Exception e = getAndClear();
+    if (e != null) {
+      throw e;
     }
   }
 
@@ -98,24 +121,18 @@ public class DeferredException implements AutoCloseable {
 
       try {
         autoCloseable.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         addException(e);
       }
     }
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized(this) {
-      Preconditions.checkState(!isClosed);
-
-      try {
-        if (exception != null) {
-          throw exception;
-        }
-      } finally {
-        isClosed = true;
-      }
+  public synchronized void close() throws Exception {
+    try {
+      throwAndClear();
+    } finally {
+      isClosed = true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
new file mode 100644
index 0000000..a75ac32
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An extended CountDownLatch which allows us to await uninterruptibly.
+ */
+public class ExtendedLatch extends CountDownLatch {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExtendedLatch.class);
+
+  public ExtendedLatch() {
+    super(1);
+  }
+
+  public ExtendedLatch(final int count) {
+    super(count);
+  }
+
+  /**
+   * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
+   * state variable or similar.
+   *
+   * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
+   */
+  protected boolean ignoreInterruptions() {
+    return true;
+  }
+
+  /**
+   * Await without interruption for a given time.
+   * @param waitMillis
+   *          Time in milliseconds to wait
+   * @return Whether the countdown reached zero or not.
+   */
+  public boolean awaitUninterruptibly(long waitMillis) {
+    final long targetMillis = System.currentTimeMillis() + waitMillis;
+    while (System.currentTimeMillis() < targetMillis) {
+      final long wait = targetMillis - System.currentTimeMillis();
+      if (wait < 1) {
+        return false;
+      }
+
+      try {
+        return await(wait, TimeUnit.MILLISECONDS);
+      } catch (final InterruptedException e) {
+        // if we weren't ready, the while loop will continue to wait
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
+   * output of ignoreInterruptions();
+   */
+  public void awaitUninterruptibly() {
+    while (true) {
+      try {
+        await();
+        return;
+      } catch (final InterruptedException e) {
+        if (ignoreInterruptions()) {
+          // if we're still not ready, the while loop will cause us to wait again
+          logger.warn("Interrupted while waiting for event latch.", e);
+        } else {
+          return;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index e995346..9283339 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -53,7 +53,7 @@ public class UserException extends DrillRuntimeException {
    *             Rpc layer or UserResultListener.submitFailed()
    */
   @Deprecated
-  public static Builder systemError(Throwable cause) {
+  public static Builder systemError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.SYSTEM, cause);
   }
 
@@ -79,7 +79,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder connectionError(Throwable cause) {
+  public static Builder connectionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.CONNECTION, cause);
   }
 
@@ -105,7 +105,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder dataReadError(Throwable cause) {
+  public static Builder dataReadError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.DATA_READ, cause);
   }
 
@@ -131,7 +131,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder dataWriteError(Throwable cause) {
+  public static Builder dataWriteError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.DATA_WRITE, cause);
   }
 
@@ -157,7 +157,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder functionError(Throwable cause) {
+  public static Builder functionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.FUNCTION, cause);
   }
 
@@ -183,7 +183,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder parseError(Throwable cause) {
+  public static Builder parseError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PARSE, cause);
   }
 
@@ -209,7 +209,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder permissionError(Throwable cause) {
+  public static Builder permissionError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PERMISSION, cause);
   }
 
@@ -235,7 +235,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder planError(Throwable cause) {
+  public static Builder planError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.PLAN, cause);
   }
 
@@ -261,7 +261,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder resourceError(Throwable cause) {
+  public static Builder resourceError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.RESOURCE, cause);
   }
 
@@ -287,7 +287,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
-  public static Builder unsupportedError(Throwable cause) {
+  public static Builder unsupportedError(final Throwable cause) {
     return new Builder(DrillPBError.ErrorType.UNSUPPORTED_OPERATION, cause);
   }
 
@@ -313,7 +313,7 @@ public class UserException extends DrillRuntimeException {
      *                  or doesn't wrap a user exception
      * @param cause exception to wrap inside a user exception. Can be null
      */
-    private Builder(DrillPBError.ErrorType errorType, Throwable cause) {
+    private Builder(final DrillPBError.ErrorType errorType, final Throwable cause) {
       this.cause = cause;
 
       //TODO handle the improbable case where cause is a SYSTEM exception ?
@@ -339,7 +339,7 @@ public class UserException extends DrillRuntimeException {
      * @param args Arguments referenced by the format specifiers in the format string
      * @return this builder
      */
-    public Builder message(String format, Object... args) {
+    public Builder message(final String format, final Object... args) {
       // we can't replace the message of a user exception
       if (uex == null && format != null) {
         this.message = String.format(format, args);
@@ -353,7 +353,7 @@ public class UserException extends DrillRuntimeException {
      *
      * @param endpoint drillbit endpoint identity
      */
-    public Builder addIdentity(CoordinationProtos.DrillbitEndpoint endpoint) {
+    public Builder addIdentity(final CoordinationProtos.DrillbitEndpoint endpoint) {
       context.add(endpoint);
       return this;
     }
@@ -363,7 +363,7 @@ public class UserException extends DrillRuntimeException {
      * @param value string line
      * @return this builder
      */
-    public Builder addContext(String value) {
+    public Builder addContext(final String value) {
       context.add(value);
       return this;
     }
@@ -375,7 +375,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, String value) {
+    public Builder addContext(final String name, final String value) {
       context.add(name, value);
       return this;
     }
@@ -387,7 +387,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, long value) {
+    public Builder addContext(final String name, final long value) {
       context.add(name, value);
       return this;
     }
@@ -399,7 +399,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder addContext(String name, double value) {
+    public Builder addContext(final String name, final double value) {
       context.add(name, value);
       return this;
     }
@@ -410,7 +410,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String value) {
+    public Builder pushContext(final String value) {
       context.push(value);
       return this;
     }
@@ -422,7 +422,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, String value) {
+    public Builder pushContext(final String name, final String value) {
       context.push(name, value);
       return this;
     }
@@ -434,7 +434,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, long value) {
+    public Builder pushContext(final String name, final long value) {
       context.push(name, value);
       return this;
     }
@@ -446,7 +446,7 @@ public class UserException extends DrillRuntimeException {
      * @param value context value
      * @return this builder
      */
-    public Builder pushContext(String name, double value) {
+    public Builder pushContext(final String name, final double value) {
       context.push(name, value);
       return this;
     }
@@ -462,7 +462,19 @@ public class UserException extends DrillRuntimeException {
         return uex;
       }
 
-      return new UserException(this);
+      final UserException newException = new UserException(this);
+
+      // since we just created a new exception, we should log it for later reference. If this is a system error, this is
+      // an issue that the Drill admin should pay attention to and we should log as ERROR. However, if this is a user
+      // mistake or data read issue, the system admin should not be concerned about these and thus we'll log this
+      // as an INFO message.
+      if (errorType == DrillPBError.ErrorType.SYSTEM) {
+        logger.error(newException.getMessage(), newException);
+      } else {
+        logger.info("User Error Occurred", newException);
+      }
+
+      return newException;
     }
   }
 
@@ -470,14 +482,14 @@ public class UserException extends DrillRuntimeException {
 
   private final UserExceptionContext context;
 
-  protected UserException(DrillPBError.ErrorType errorType, String message, Throwable cause) {
+  protected UserException(final DrillPBError.ErrorType errorType, final String message, final Throwable cause) {
     super(message, cause);
 
     this.errorType = errorType;
     this.context = new UserExceptionContext();
   }
 
-  private UserException(Builder builder) {
+  private UserException(final Builder builder) {
     super(builder.message, builder.cause);
     this.errorType = builder.errorType;
     this.context = builder.context;
@@ -516,10 +528,10 @@ public class UserException extends DrillRuntimeException {
    * @param verbose should the error object contain the verbose error message ?
    * @return protobuf error object
    */
-  public DrillPBError getOrCreatePBError(boolean verbose) {
-    String message = verbose ? getVerboseMessage() : getMessage();
+  public DrillPBError getOrCreatePBError(final boolean verbose) {
+    final String message = verbose ? getVerboseMessage() : getMessage();
 
-    DrillPBError.Builder builder = DrillPBError.newBuilder();
+    final DrillPBError.Builder builder = DrillPBError.newBuilder();
     builder.setErrorType(errorType);
     builder.setErrorId(context.getErrorId());
     if (context.getEndpoint() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
index 81904df..5e7a9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
@@ -54,7 +54,7 @@ public class AsmUtil {
     final ClassReader ver = new ClassReader(verifyWriter.toByteArray());
     try {
       DrillCheckClassAdapter.verify(ver, false, new PrintWriter(sw));
-    } catch(Exception e) {
+    } catch(final Exception e) {
       logger.info("Caught exception verifying class:");
       logClass(logger, logTag, classNode);
       throw e;
@@ -98,19 +98,25 @@ public class AsmUtil {
   /**
    * Write a class to the log.
    *
-   * <p>Writes at level DEBUG.
+   * <p>
+   * Writes at level TRACE.
    *
-   * @param logger the logger to write to
-   * @param logTag a tag to print to the log
-   * @param classNode the class
+   * @param logger
+   *          the logger to write to
+   * @param logTag
+   *          a tag to print to the log
+   * @param classNode
+   *          the class
    */
   public static void logClass(final Logger logger, final String logTag, final ClassNode classNode) {
-    logger.debug(logTag);
-    final StringWriter stringWriter = new StringWriter();
-    final PrintWriter printWriter = new PrintWriter(stringWriter);
-    final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
-    classNode.accept(traceClassVisitor);
-    logger.debug(stringWriter.toString());
+    if (logger.isTraceEnabled()) {
+      logger.trace(logTag);
+      final StringWriter stringWriter = new StringWriter();
+      final PrintWriter printWriter = new PrintWriter(stringWriter);
+      final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
+      classNode.accept(traceClassVisitor);
+      logger.trace(stringWriter.toString());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
index 6c0292e..d25b1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
@@ -51,12 +51,6 @@ public class InstructionModifier extends MethodVisitor {
 
   private int stackIncrease = 0; // how much larger we have to make the stack
 
-  /*
-   * True if we've transferred holder members to locals. If so, then the maximum stack
-   * size must be increased by one to accommodate the extra DUP that is used to do so.
-   */
-  private boolean transferredToLocals = false;
-
   public InstructionModifier(final int access, final String name, final String desc,
       final String signature, final String[] exceptions, final TrackingInstructionList list,
       final MethodVisitor inner) {
@@ -116,7 +110,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitInsn(int opcode) {
+  public void visitInsn(final int opcode) {
     switch (opcode) {
     case Opcodes.DUP:
       /*
@@ -276,7 +270,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitTypeInsn(int opcode, String type) {
+  public void visitTypeInsn(final int opcode, final String type) {
     /*
      * This includes NEW, NEWARRAY, CHECKCAST, or INSTANCEOF.
      *
@@ -285,9 +279,9 @@ public class InstructionModifier extends MethodVisitor {
      * replaced the values for those, but we might find other reasons to replace
      * things, in which case this will be too broad.
      */
-    ReplacingBasicValue r = getFunctionReturn();
+    final ReplacingBasicValue r = getFunctionReturn();
     if (r != null) {
-      ValueHolderSub sub = r.getIden().getHolderSub(adder);
+      final ValueHolderSub sub = r.getIden().getHolderSub(adder);
       oldToNew.put(r.getIndex(), sub);
     } else {
       super.visitTypeInsn(opcode, type);
@@ -295,7 +289,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitLineNumber(int line, Label start) {
+  public void visitLineNumber(final int line, final Label start) {
     lastLineNumber = line;
     super.visitLineNumber(line, start);
   }
@@ -347,7 +341,7 @@ public class InstructionModifier extends MethodVisitor {
 
   @Override
   public void visitMaxs(final int maxStack, final int maxLocals) {
-    super.visitMaxs(maxStack + stackIncrease + (transferredToLocals ? 1 : 0), maxLocals);
+    super.visitMaxs(maxStack + stackIncrease, maxLocals);
   }
 
   @Override
@@ -397,7 +391,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+  public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc) {
     /*
      * This method was deprecated in the switch from api version ASM4 to ASM5.
      * If we ever go back (via CompilationConfig.ASM_API_VERSION), we need to
@@ -408,7 +402,7 @@ public class InstructionModifier extends MethodVisitor {
   }
 
   @Override
-  public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) {
+  public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc, final boolean itf) {
     // this version of visitMethodInsn() came after ASM4
     assert CompilationConfig.ASM_API_VERSION != Opcodes.ASM4;
 
@@ -471,7 +465,7 @@ public class InstructionModifier extends MethodVisitor {
 
   @Override
   public void visitEnd() {
-    if (logger.isDebugEnabled()) {
+    if (logger.isTraceEnabled()) {
       final StringBuilder sb = new StringBuilder();
       sb.append("InstructionModifier ");
       sb.append(name);
@@ -488,7 +482,7 @@ public class InstructionModifier extends MethodVisitor {
       int itemCount = 0; // counts up the number of items found
       final HashMap<ValueHolderIden, Integer> seenIdens = new HashMap<>(); // iden -> idenId
       sb.append(" .oldToNew:\n");
-      for (IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
+      for (final IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
         final ValueHolderIden iden = ioc.value.iden();
         if (!seenIdens.containsKey(iden)) {
           seenIdens.put(iden, ++idenId);
@@ -501,7 +495,7 @@ public class InstructionModifier extends MethodVisitor {
       }
 
       sb.append(" .oldLocalToFirst:\n");
-      for (IntIntCursor iic : oldLocalToFirst) {
+      for (final IntIntCursor iic : oldLocalToFirst) {
         sb.append("  " + iic.key + " => " + iic.value + '\n');
         ++itemCount;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
index 6e981bc..adbf2fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
@@ -60,14 +60,14 @@ public class ScalarReplacementNode extends MethodNode {
     Frame<BasicValue>[] frames;
     try {
       frames = analyzer.analyze(className, this);
-    } catch (AnalyzerException e) {
+    } catch (final AnalyzerException e) {
       throw new IllegalStateException(e);
     }
 
-    if (logger.isDebugEnabled()) {
+    if (logger.isTraceEnabled()) {
       final StringBuilder sb = new StringBuilder();
       sb.append("ReplacingBasicValues for " + className + "\n");
-      for(ReplacingBasicValue value : valueList) {
+      for(final ReplacingBasicValue value : valueList) {
         value.dump(sb, 2);
         sb.append('\n');
       }
@@ -75,14 +75,14 @@ public class ScalarReplacementNode extends MethodNode {
     }
 
     // wrap the instruction handler so that we can do additional things
-    TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
+    final TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
     this.instructions = list;
 
     MethodVisitor methodVisitor = inner;
     if (verifyBytecode) {
       methodVisitor = new CheckMethodVisitorFsm(CompilationConfig.ASM_API_VERSION, methodVisitor);
     }
-    InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
+    final InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
         this.signature, this.exceptionsArr, list, methodVisitor);
     accept(holderV);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 2d22d84..c953bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -58,6 +58,7 @@ public class BufferManager implements AutoCloseable {
         ((DrillBuf)mbuffers[i]).release();
       }
     }
+    managedBuffers.clear();
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 44ca78a..c46613d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.ops;
 
-import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -27,7 +26,6 @@ import java.util.Map;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
-import org.apache.drill.common.DeferredException;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -53,6 +51,8 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 /**
@@ -72,31 +72,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private IncomingBuffers buffers;
   private final OptionManager fragmentOptions;
   private final BufferManager bufferManager;
+  private ExecutorState executorState;
 
-  private final DeferredException deferredException = new DeferredException();
-  private volatile FragmentContextState state = FragmentContextState.OK;
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-
     @Override
-    public void accept(RpcException e) {
+    public void accept(final RpcException e) {
       fail(e);
     }
   };
+
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
   private final AccountingUserConnection accountingUserConnection;
 
-  /*
-   * TODO we need a state that indicates that cancellation has been requested and
-   * is in progress. Early termination (such as from limit queries) could also use
-   * this, as the cleanup steps should be exactly the same.
-   */
-  private static enum FragmentContextState {
-    OK,
-    FAILED,
-    CANCELED
-  }
-
   public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
       final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
     throws ExecutionSetupException {
@@ -111,14 +99,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
 
     try {
-      OptionList list;
+      final OptionList list;
       if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
         list = new OptionList();
       } else {
         list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
       }
       fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
 
@@ -127,8 +115,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     try {
       allocator = context.getAllocator().getChildAllocator(
           this, fragment.getMemInitial(), fragment.getMemMax(), true);
-      assert (allocator != null);
-    } catch(Throwable e) {
+      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+    } catch(final Throwable e) {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
     }
 
@@ -140,37 +128,29 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return fragmentOptions;
   }
 
-  public void setBuffers(IncomingBuffers buffers) {
+  public void setBuffers(final IncomingBuffers buffers) {
+    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
     this.buffers = buffers;
   }
 
-  public void fail(Throwable cause) {
-    final FragmentHandle fragmentHandle = fragment.getHandle();
-
-    UserException dse = UserException.systemError(cause).addIdentity(getIdentity()).build();
-
-    // log the error id
-    logger.error("Fragment Context received failure -- Fragment: {}:{}",
-      fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), dse);
-
-    setState(FragmentContextState.FAILED);
-    deferredException.addThrowable(dse);
+  public void setExecutorState(final ExecutorState executorState) {
+    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+    this.executorState = executorState;
   }
 
-  public void cancel() {
-    setState(FragmentContextState.CANCELED);
+  public void fail(final Throwable cause) {
+    executorState.fail(cause);
   }
 
   /**
-   * Allowed transitions from left to right: OK -> FAILED -> CANCELED
-   * @param newState
+   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
+   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
+   * often so that Drill is responsive to cancellation operations.
+   *
+   * @return false if the action should terminate immediately, true if everything is okay.
    */
-  private synchronized void setState(FragmentContextState newState) {
-    if (state == FragmentContextState.OK) {
-      state = newState;
-    } else if (newState == FragmentContextState.CANCELED) {
-      state = newState;
-    }
+  public boolean shouldContinue() {
+    return executorState.shouldContinue();
   }
 
   public DrillbitContext getDrillbitContext() {
@@ -224,6 +204,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return frag;
   }
 
+
+
   /**
    * Get this fragment's allocator.
    * @return the allocator
@@ -252,11 +234,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return context.getCompiler().getImplementationClass(cg);
   }
 
-  public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
     return getImplementationClass(cg.getCodeGenerator(), instanceCount);
   }
 
-  public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
     return context.getCompiler().getImplementationClass(cg, instanceCount);
   }
 
@@ -269,7 +251,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return context.getController().getTunnel(endpoint);
   }
 
-  public AccountingDataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
     AccountingDataTunnel tunnel = tunnels.get(endpoint);
     if (tunnel == null) {
       tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
@@ -282,16 +264,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return buffers;
   }
 
+  @VisibleForTesting
+  @Deprecated
   public Throwable getFailureCause() {
-    return deferredException.getException();
+    return executorState.getFailureCause();
   }
 
+  @VisibleForTesting
+  @Deprecated
   public boolean isFailed() {
-    return state == FragmentContextState.FAILED;
-  }
-
-  public boolean isCancelled() {
-    return state == FragmentContextState.CANCELED;
+    return executorState.isFailed();
   }
 
   public FunctionImplementationRegistry getFunctionRegistry() {
@@ -306,24 +288,25 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     allocator.setFragmentLimit(limit);
   }
 
-  public DeferredException getDeferredException() {
-    return deferredException;
-  }
-
   @Override
-  public void close() throws Exception {
-    /*
-     * TODO wait for threads working on this Fragment to terminate (or at least stop working
-     * on this Fragment's query)
-     */
-    deferredException.suppressingClose(bufferManager);
-    deferredException.suppressingClose(buffers);
-    deferredException.suppressingClose(allocator);
+  public void close() {
+    waitForSendComplete();
+    suppressingClose(bufferManager);
+    suppressingClose(buffers);
+    suppressingClose(allocator);
+  }
 
-    deferredException.close(); // must be last, as this may throw
+  private void suppressingClose(final AutoCloseable closeable) {
+    try {
+      if (closeable != null) {
+        closeable.close();
+      }
+    } catch (final Exception e) {
+      fail(e);
+    }
   }
 
-  public DrillBuf replace(DrillBuf old, int newSize) {
+  public DrillBuf replace(final DrillBuf old, final int newSize) {
     return bufferManager.replace(old, newSize);
   }
 
@@ -332,7 +315,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return bufferManager.getManagedBuffer();
   }
 
-  public DrillBuf getManagedBuffer(int size) {
+  public DrillBuf getManagedBuffer(final int size) {
     return bufferManager.getManagedBuffer(size);
   }
 
@@ -350,4 +333,30 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     sendingAccountor.waitForSendComplete();
   }
 
+  public interface ExecutorState {
+    /**
+     * Whether execution should continue.
+     *
+     * @return false if execution should stop.
+     */
+    public boolean shouldContinue();
+
+    /**
+     * Inform the executor if a exception occurs and fragment should be failed.
+     *
+     * @param t
+     *          The exception that occurred.
+     */
+    public void fail(final Throwable t);
+
+    @VisibleForTesting
+    @Deprecated
+    public boolean isFailed();
+
+    @VisibleForTesting
+    @Deprecated
+    public Throwable getFailureCause();
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 5b7ca66..628dcd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -34,7 +34,7 @@ public abstract class BaseRootExec implements RootExec {
   protected OperatorContext oContext = null;
   protected FragmentContext fragmentContext = null;
 
-  public BaseRootExec(FragmentContext fragmentContext, PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = new OperatorContext(config, fragmentContext, stats, true);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -43,7 +43,7 @@ public abstract class BaseRootExec implements RootExec {
     this.fragmentContext = fragmentContext;
   }
 
-  public BaseRootExec(FragmentContext fragmentContext, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
       config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -56,7 +56,7 @@ public abstract class BaseRootExec implements RootExec {
   public final boolean next() {
     // Stats should have been initialized
     assert stats != null;
-    if (fragmentContext.isFailed()) {
+    if (!fragmentContext.shouldContinue()) {
       return false;
     }
     try {
@@ -67,7 +67,7 @@ public abstract class BaseRootExec implements RootExec {
     }
   }
 
-  public final IterOutcome next(RecordBatch b){
+  public final IterOutcome next(final RecordBatch b){
     stats.stopProcessing();
     IterOutcome next;
     try {
@@ -90,7 +90,7 @@ public abstract class BaseRootExec implements RootExec {
   public abstract boolean innerNext();
 
   @Override
-  public void receivingFragmentFinished(FragmentHandle handle) {
+  public void receivingFragmentFinished(final FragmentHandle handle) {
     logger.warn("Currently not handling FinishedFragment message");
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e230fd2..b8ef690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -77,7 +76,6 @@ import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
 
 import parquet.Preconditions;
 
@@ -97,12 +95,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
   private RecordBatchLoader[] batchLoaders;
-  private RawFragmentBatchProvider[] fragProviders;
-  private FragmentContext context;
+  private final RawFragmentBatchProvider[] fragProviders;
+  private final FragmentContext context;
   private BatchSchema schema;
   private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
-  private MergingReceiverPOP config;
+  private final MergingReceiverPOP config;
   private boolean hasRun = false;
   private boolean prevBatchWasFull = false;
   private boolean hasMoreIncoming = true;
@@ -126,9 +124,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
-  public MergingRecordBatch(FragmentContext context,
-                            MergingReceiverPOP config,
-                            RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
+  public MergingRecordBatch(final FragmentContext context,
+                            final MergingReceiverPOP config,
+                            final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
     super(config, context, true, new OperatorContext(config, context, false));
     //super(config, context);
     this.fragProviders = fragProviders;
@@ -138,10 +136,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.config = config;
   }
 
-  private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+  private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
     stats.startWait();
     try {
-      RawFragmentBatch b = provider.getNext();
+      final RawFragmentBatch b = provider.getNext();
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
         stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
@@ -177,9 +175,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       schemaChanged = true; // first iteration is always a schema change
 
       // set up each (non-empty) incoming record batch
-      List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+      final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
       int p = 0;
-      for (RawFragmentBatchProvider provider : fragProviders) {
+      for (final RawFragmentBatchProvider provider : fragProviders) {
         RawFragmentBatch rawBatch = null;
         try {
           // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
@@ -190,10 +188,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatch = getNext(provider);
           }
           p++;
-          if (rawBatch == null && context.isCancelled()) {
+          if (rawBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
@@ -208,10 +206,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
               ;
             }
-            if (rawBatch == null && context.isCancelled()) {
+            if (rawBatch == null && !context.shouldContinue()) {
               return IterOutcome.STOP;
             }
-          } catch (IOException e) {
+          } catch (final IOException e) {
             context.fail(e);
             return IterOutcome.STOP;
           }
@@ -234,12 +232,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       int i = 0;
-      for (RawFragmentBatch batch : incomingBatches) {
+      for (final RawFragmentBatch batch : incomingBatches) {
         // initialize the incoming batchLoaders
-        UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+        final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
         try {
           batchLoaders[i].load(rbd, batch.getBody());
-        } catch(SchemaChangeException e) {
+        } catch(final SchemaChangeException e) {
           logger.error("MergingReceiver failed to load record batch from remote host.  {}", e);
           context.fail(e);
           return IterOutcome.STOP;
@@ -250,7 +248,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
-      for (RecordBatchLoader loader : batchLoaders) {
+      for (final RecordBatchLoader loader : batchLoaders) {
         loader.canonicalize();
       }
 
@@ -262,15 +260,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       // create the outgoing schema and vector container, and allocate the initial batch
-      SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+      final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
       int vectorCount = 0;
-      for (VectorWrapper<?> v : batchLoaders[0]) {
+      for (final VectorWrapper<?> v : batchLoaders[0]) {
 
         // add field to the output schema
         bldr.addField(v.getField());
 
         // allocate a new value vector
-        ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
+        final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
         ++vectorCount;
       }
       allocateOutgoing();
@@ -286,7 +284,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       // generate code for merge operations (copy and compare)
       try {
         merger = createMerger();
-      } catch (SchemaChangeException e) {
+      } catch (final SchemaChangeException e) {
         logger.error("Failed to generate code for MergingReceiver.  {}", e);
         context.fail(e);
         return IterOutcome.STOP;
@@ -294,9 +292,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // allocate the priority queue with the generated comparator
       this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
-        public int compare(Node node1, Node node2) {
-          int leftIndex = (node1.batchId << 16) + node1.valueIndex;
-          int rightIndex = (node2.batchId << 16) + node2.valueIndex;
+        public int compare(final Node node1, final Node node2) {
+          final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
+          final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
           return merger.doEval(leftIndex, rightIndex);
         }
       });
@@ -305,14 +303,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       for (int b = 0; b < senderCount; ++b) {
         while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
           try {
-            RawFragmentBatch batch = getNext(fragProviders[b]);
+            final RawFragmentBatch batch = getNext(fragProviders[b]);
             incomingBatches[b] = batch;
             if (batch != null) {
               batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
-              if (context.isCancelled()) {
+              if (!context.shouldContinue()) {
                 return IterOutcome.STOP;
               }
             }
@@ -332,7 +330,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
     while (!pqueue.isEmpty()) {
       // pop next value from pq and copy to outgoing batch
-      Node node = pqueue.peek();
+      final Node node = pqueue.peek();
       if (!copyRecordToOutgoingBatch(node)) {
         logger.debug("Outgoing vectors space is full; breaking");
         prevBatchWasFull = true;
@@ -355,10 +353,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
             nextBatch = getNext(fragProviders[node.batchId]);
           }
-          if (nextBatch == null && context.isCancelled()) {
+          if (nextBatch == null && !context.shouldContinue()) {
             return IterOutcome.STOP;
           }
-        } catch (IOException e) {
+        } catch (final IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
         }
@@ -369,7 +367,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           // batch is empty
           boolean allBatchesEmpty = true;
 
-          for (RawFragmentBatch batch : incomingBatches) {
+          for (final RawFragmentBatch batch : incomingBatches) {
             // see if all batches are empty so we can return OK_* or NONE
             if (batch != null) {
               allBatchesEmpty = false;
@@ -387,10 +385,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           continue;
         }
 
-        UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
+        final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
         try {
           batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
-        } catch(SchemaChangeException ex) {
+        } catch(final SchemaChangeException ex) {
           context.fail(ex);
           return IterOutcome.STOP;
         }
@@ -412,7 +410,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
 
     // set the value counts in the outgoing vectors
-    for (VectorWrapper vw : outgoingContainer) {
+    for (final VectorWrapper vw : outgoingContainer) {
       vw.getValueVector().getMutator().setValueCount(outgoingPosition);
     }
 
@@ -448,19 +446,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           state = BatchState.DONE;
           return;
         }
-        RawFragmentBatch batch = getNext(fragProviders[i]);
+        final RawFragmentBatch batch = getNext(fragProviders[i]);
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
           continue;
         }
         tempBatchHolder[i] = batch;
-        for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
-          ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+        for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+          final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
           v.allocateNew();
         }
         break;
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
     outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
@@ -473,27 +471,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
     } else {
       cleanup();
-      for (RawFragmentBatchProvider provider : fragProviders) {
+      for (final RawFragmentBatchProvider provider : fragProviders) {
         provider.kill(context);
       }
     }
   }
 
   private void informSenders() {
-    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+    logger.info("Informing senders of request to terminate sending.");
+    final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
-      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+    for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+      final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
               .setMinorFragmentId(providingEndpoint.getId())
               .build();
-      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+      final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
@@ -504,18 +503,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
-    public void failed(RpcException ex) {
+    public void failed(final RpcException ex) {
       logger.warn("Failed to inform upstream that receiver is finished");
     }
 
     @Override
-    public void success(Ack value, ByteBuf buffer) {
+    public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     //No op
   }
 
@@ -535,12 +534,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return outgoingContainer.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return outgoingContainer.getValueAccessorById(clazz, ids);
   }
 
@@ -549,10 +548,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     return WritableBatch.get(this);
   }
 
-  private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
+  private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
     Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
 
-    BatchSchema schema = batchLoaders[0].getSchema();
+    final BatchSchema schema = batchLoaders[0].getSchema();
 
     for (int i = 1; i < batchLoaders.length; i++) {
       if (!schema.equals(batchLoaders[i].getSchema())) {
@@ -564,8 +563,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   private void allocateOutgoing() {
-    for (VectorWrapper w : outgoingContainer) {
-      ValueVector v = w.getValueVector();
+    for (final VectorWrapper w : outgoingContainer) {
+      final ValueVector v = w.getValueVector();
       if (v instanceof FixedWidthVector) {
         AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
       } else {
@@ -587,12 +586,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
 
     try {
-      CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-      ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
+      final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+      final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
 
       ExpandableHyperContainer batch = null;
       boolean first = true;
-      for (RecordBatchLoader loader : batchLoaders) {
+      for (final RecordBatchLoader loader : batchLoaders) {
         if (first) {
           batch = new ExpandableHyperContainer(loader);
           first = false;
@@ -606,7 +605,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       g.setMappingSet(COPIER_MAPPING_SET);
       CopyUtil.generateCopies(g, batch, true);
       g.setMappingSet(MAIN_MAPPING);
-      MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
+      final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
 
       merger.doSetup(context, batch, outgoingContainer);
       return merger;
@@ -621,28 +620,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
   public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
 
-  private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+  private void generateComparisons(final ClassGenerator g, final VectorAccessible batch) throws SchemaChangeException {
     g.setMappingSet(MAIN_MAPPING);
 
-    for (Ordering od : popConfig.getOrderings()) {
+    for (final Ordering od : popConfig.getOrderings()) {
       // first, we rewrite the evaluation stack for each side of the comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
+      final ErrorCollector collector = new ErrorCollectorImpl();
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
       g.setMappingSet(LEFT_MAPPING);
-      HoldingContainer left = g.addExpr(expr, false);
+      final HoldingContainer left = g.addExpr(expr, false);
       g.setMappingSet(RIGHT_MAPPING);
-      HoldingContainer right = g.addExpr(expr, false);
+      final HoldingContainer right = g.addExpr(expr, false);
       g.setMappingSet(MAIN_MAPPING);
 
       // next we wrap the two comparison sides and add the expression block for the comparison.
-      LogicalExpression fh =
+      final LogicalExpression fh =
           FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
                                                          context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, false);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      final HoldingContainer out = g.addExpr(fh, false);
+      final JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 
       if (od.getDirection() == Direction.ASCENDING) {
         jc._then()._return(out.getValue());
@@ -660,8 +659,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
    *
    * @param node Reference to the next record to copy from the incoming batches
    */
-  private boolean copyRecordToOutgoingBatch(Node node) {
-    int inIndex = (node.batchId << 16) + node.valueIndex;
+  private boolean copyRecordToOutgoingBatch(final Node node) {
+    final int inIndex = (node.batchId << 16) + node.valueIndex;
     merger.doCopy(inIndex, outgoingPosition);
     outgoingPosition++;
     if (outgoingPosition == OUTGOING_BATCH_SIZE) {
@@ -677,7 +676,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public class Node {
     public int batchId;      // incoming batch
     public int valueIndex;   // value within the batch
-    Node(int batchId, int valueIndex) {
+    Node(final int batchId, final int valueIndex) {
       this.batchId = batchId;
       this.valueIndex = valueIndex;
     }
@@ -687,7 +686,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public void cleanup() {
     outgoingContainer.clear();
     if (batchLoaders != null) {
-      for (RecordBatchLoader rbl : batchLoaders) {
+      for (final RecordBatchLoader rbl : batchLoaders) {
         if (rbl != null) {
           rbl.clear();
         }
@@ -695,7 +694,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
     oContext.close();
     if (fragProviders != null) {
-      for (RawFragmentBatchProvider f : fragProviders) {
+      for (final RawFragmentBatchProvider f : fragProviders) {
         f.cleanup();
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c50cb8a..c2d6166 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -20,10 +20,8 @@ package org.apache.drill.exec.physical.impl.producer;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -42,16 +40,16 @@ import org.apache.drill.exec.vector.ValueVector;
 public class ProducerConsumerBatch extends AbstractRecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
 
-  private RecordBatch incoming;
-  private Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
+  private final RecordBatch incoming;
+  private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
   private boolean running = false;
-  private BlockingDeque<RecordBatchDataWrapper> queue;
+  private final BlockingDeque<RecordBatchDataWrapper> queue;
   private int recordCount;
   private BatchSchema schema;
   private boolean stop = false;
   private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
 
-  protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
     this.queue = new LinkedBlockingDeque<>(popConfig.getSize());
@@ -68,8 +66,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       stats.startWait();
       wrapper = queue.take();
       logger.debug("Got batch from queue");
-    } catch (InterruptedException e) {
-      if (!(context.isCancelled() || context.isFailed())) {
+    } catch (final InterruptedException e) {
+      if (!context.shouldContinue()) {
         context.fail(e);
       }
       return IterOutcome.STOP;
@@ -84,30 +82,30 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     }
 
     recordCount = wrapper.batch.getRecordCount();
-    boolean newSchema = load(wrapper.batch);
+    final boolean newSchema = load(wrapper.batch);
 
     return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
   }
 
-  private boolean load(RecordBatchData batch) {
-    VectorContainer newContainer = batch.getContainer();
+  private boolean load(final RecordBatchData batch) {
+    final VectorContainer newContainer = batch.getContainer();
     if (schema != null && newContainer.getSchema().equals(schema)) {
       container.zeroVectors();
-      BatchSchema schema = container.getSchema();
+      final BatchSchema schema = container.getSchema();
       for (int i = 0; i < container.getNumberOfColumns(); i++) {
-        MaterializedField field = schema.getColumn(i);
-        MajorType type = field.getType();
-        ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+        final MaterializedField field = schema.getColumn(i);
+        final MajorType type = field.getType();
+        final ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
                 container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
-        ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+        final ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
                 newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
-        TransferPair tp = vIn.makeTransferPair(vOut);
+        final TransferPair tp = vIn.makeTransferPair(vOut);
         tp.transfer();
       }
       return false;
     } else {
       container.clear();
-      for (VectorWrapper w : newContainer) {
+      for (final VectorWrapper w : newContainer) {
         container.add(w.getValueVector());
       }
       container.buildSchema(SelectionVectorMode.NONE);
@@ -128,7 +126,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
         }
         outer:
         while (true) {
-          IterOutcome upstream = incoming.next();
+          final IterOutcome upstream = incoming.next();
           switch (upstream) {
             case NONE:
               stop = true;
@@ -146,7 +144,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
               throw new UnsupportedOperationException();
           }
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
         // TODO InterruptedException
       } finally {
@@ -154,7 +152,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
           try {
             clearQueue();
             queue.put(new RecordBatchDataWrapper(null, true, false));
-          } catch (InterruptedException e) {
+          } catch (final InterruptedException e) {
             logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
             // TODO InterruptedException
           }
@@ -177,12 +175,12 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     stop = true;
     producer.interrupt();
     try {
       producer.join();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer thread");
       // TODO InterruptedException
     }
@@ -193,7 +191,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     stop = true;
     try {
       cleanUpLatch.await();
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
       // TODO InterruptedException
     } finally {
@@ -213,7 +211,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     boolean finished;
     boolean failed;
 
-    RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean failed) {
+    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
       this.batch = batch;
       this.finished = finished;
       this.failed = failed;