You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/15 14:12:31 UTC

[2/5] drill git commit: DRILL-1942-hygiene: - add AutoCloseable to many classes - minor fixes - formatting

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index c1d78c3..432e06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Filter;
@@ -44,11 +43,10 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.google.common.collect.Lists;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
 
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
-  private BufferAllocator.PreAllocator svAllocator;
   private Filterer filter;
 
   public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
@@ -84,7 +82,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     return IterOutcome.OK;
   }
 
-
   @Override
   public void close() {
     if (sv2 != null) {
@@ -152,15 +149,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
     cg.addExpr(new ReturnValueExpression(expr));
 
-//    for (VectorWrapper<?> i : incoming) {
-//      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
-//      container.add(v);
-//      allocators.add(getAllocator4(v));
-//    }
-
-    for (VectorWrapper<?> vw : incoming) {
-      for (ValueVector vv : vw.getValueVectors()) {
-        TransferPair pair = vv.getTransferPair();
+    for (final VectorWrapper<?> vw : incoming) {
+      for (final ValueVector vv : vw.getValueVectors()) {
+        final TransferPair pair = vv.getTransferPair();
         container.add(pair.getTo());
         transfers.add(pair);
       }
@@ -170,8 +161,8 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     container.buildSchema(SelectionVectorMode.FOUR_BYTE);
 
     try {
-      TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
-      Filterer filter = context.getImplementationClass(cg);
+      final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+      final Filterer filter = context.getImplementationClass(cg);
       filter.setup(context, incoming, this, tx);
       return filter;
     } catch (ClassTransformationException | IOException e) {
@@ -192,21 +183,18 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
     cg.addExpr(new ReturnValueExpression(expr));
 
-    for (VectorWrapper<?> v : incoming) {
-      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
+    for (final VectorWrapper<?> v : incoming) {
+      final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
     }
 
-
     try {
-      TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
-      Filterer filter = context.getImplementationClass(cg);
+      final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+      final Filterer filter = context.getImplementationClass(cg);
       filter.setup(context, incoming, this, tx);
       return filter;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 73f3435..21ebd9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -63,7 +63,6 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JVar;
 
 public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
-
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
@@ -186,8 +185,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     hjHelper = new HashJoinHelper(context, oContext.getAllocator());
     try {
       rightSchema = right.getSchema();
-      VectorContainer vectors = new VectorContainer(oContext);
-      for (VectorWrapper w : right) {
+      final VectorContainer vectors = new VectorContainer(oContext);
+      for (final VectorWrapper<?> w : right) {
         vectors.addOrGet(w.getField());
       }
       vectors.buildSchema(SelectionVectorMode.NONE);
@@ -198,7 +197,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       setupHashTable();
       hashJoinProbe = setupHashJoinProbe();
       // Build the container schema and set the counts
-      for (VectorWrapper w : container) {
+      for (final VectorWrapper<?> w : container) {
         w.getValueVector().allocateNew();
       }
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -243,8 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             state = BatchState.NOT_FIRST;
           }
 
-
-          for (VectorWrapper<?> v : container) {
+          for (final VectorWrapper<?> v : container) {
             v.getValueVector().getMutator().setValueCount(outputRecords);
           }
 
@@ -253,13 +251,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       } else {
         // Our build side is empty, we won't have any matches, clear the probe side
         if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-          for (VectorWrapper<?> wrapper : left) {
+          for (final VectorWrapper<?> wrapper : left) {
             wrapper.getValueVector().clear();
           }
           left.kill(true);
           leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
           while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-            for (VectorWrapper<?> wrapper : left) {
+            for (final VectorWrapper<?> wrapper : left) {
               wrapper.getValueVector().clear();
             }
             leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
@@ -281,11 +279,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   }
 
   public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
-
     // Setup the hash table configuration object
     int conditionsSize = conditions.size();
-
-    NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+    final NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
     NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
 
     JoinComparator comparator = JoinComparator.NONE;
@@ -299,7 +295,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
 
     assert comparator != JoinComparator.NONE;
-    boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
+    final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
 
     // Set the left named expression to be null if the probe batch is empty.
     if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
@@ -310,24 +306,23 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       }
     }
 
-    HashTableConfig htConfig =
+    final HashTableConfig htConfig =
         new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
             HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
     // Create the chained hash table
-    ChainedHashTable ht =
+    final ChainedHashTable ht =
         new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null,
             areNullsEqual);
     hashTable = ht.createAndSetupHashTable(null);
   }
 
   public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
     //Setup the underlying hash table
 
     // skip first batch if count is zero, as it may be an empty schema batch
     if (right.getRecordCount() == 0) {
-      for (VectorWrapper w : right) {
+      for (final VectorWrapper<?> w : right) {
         w.clear();
       }
       rightUpstream = next(right);
@@ -336,9 +331,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     boolean moreData = true;
 
     while (moreData) {
-
       switch (rightUpstream) {
-
       case OUT_OF_MEMORY:
       case NONE:
       case NOT_YET:
@@ -362,7 +355,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         }
         // Fall through
       case OK:
-        int currentRecordCount = right.getRecordCount();
+        final int currentRecordCount = right.getRecordCount();
 
                     /* For every new build batch, we store some state in the helper context
                      * Add new state to the helper context
@@ -370,11 +363,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         hjHelper.addNewBatch(currentRecordCount);
 
         // Holder contains the global index where the key is hashed into using the hash table
-        IndexPointer htIndex = new IndexPointer();
+        final IndexPointer htIndex = new IndexPointer();
 
         // For every record in the build batch , hash the key columns
         for (int i = 0; i < currentRecordCount; i++) {
-
           hashTable.put(i, htIndex, 1 /* retry count */);
 
                         /* Use the global index returned by the hash table, to store
@@ -388,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                      * to the hyper vector container. Will be used when we want to retrieve
                      * records that have matching keys on the probe side.
                      */
-        RecordBatchData nextBatch = new RecordBatchData(right);
+        final RecordBatchData nextBatch = new RecordBatchData(right);
         boolean success = false;
         try {
           if (hyperContainer == null) {
@@ -412,27 +404,22 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
   }
 
-
   public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
-
-
     final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-    ClassGenerator<HashJoinProbe> g = cg.getRoot();
+    final ClassGenerator<HashJoinProbe> g = cg.getRoot();
 
     // Generate the code to project build side records
     g.setMappingSet(projectBuildMapping);
 
-
     int fieldId = 0;
-    JExpression buildIndex = JExpr.direct("buildIndex");
-    JExpression outIndex = JExpr.direct("outIndex");
+    final JExpression buildIndex = JExpr.direct("buildIndex");
+    final JExpression outIndex = JExpr.direct("outIndex");
     g.rotateBlock();
 
     if (rightSchema != null) {
-      for (MaterializedField field : rightSchema) {
-
-        MajorType inputType = field.getType();
-        MajorType outputType;
+      for (final MaterializedField field : rightSchema) {
+        final MajorType inputType = field.getType();
+        final MajorType outputType;
         // If left or full outer join, then the output type must be nullable. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2197).
         if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
@@ -447,8 +434,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         // Add the vector to our output container
         container.addOrGet(projected);
 
-        JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
-        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+        final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
+        final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
         g.getEvalBlock().add(outVV.invoke("copyFromSafe")
             .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
             .arg(outIndex)
@@ -463,14 +450,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
     int outputFieldId = fieldId;
     fieldId = 0;
-    JExpression probeIndex = JExpr.direct("probeIndex");
-    int recordCount = 0;
+    final JExpression probeIndex = JExpr.direct("probeIndex");
 
     if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
-      for (VectorWrapper<?> vv : left) {
-
-        MajorType inputType = vv.getField().getType();
-        MajorType outputType;
+      for (final VectorWrapper<?> vv : left) {
+        final MajorType inputType = vv.getField().getType();
+        final MajorType outputType;
 
         // If right or full outer join then the output type should be optional. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
@@ -481,30 +466,28 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
           outputType = inputType;
         }
 
-        ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType));
+        final ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType));
         if (v instanceof AbstractContainerVector) {
           vv.getValueVector().makeTransferPair(v);
           v.clear();
         }
 
-        JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
-        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+        final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+        final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
         g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
 
         fieldId++;
         outputFieldId++;
       }
-      recordCount = left.getRecordCount();
     }
 
-    HashJoinProbe hj = context.getImplementationClass(cg);
-
+    final HashJoinProbe hj = context.getImplementationClass(cg);
     return hj;
   }
 
   private void allocateVectors() {
-    for (VectorWrapper<?> v : container) {
+    for (final VectorWrapper<?> v : container) {
       v.getValueVector().allocateNew();
     }
   }
@@ -514,8 +497,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     super(popConfig, context, true);
     this.left = left;
     this.right = right;
-    this.joinType = popConfig.getJoinType();
-    this.conditions = popConfig.getConditions();
+    joinType = popConfig.getJoinType();
+    conditions = popConfig.getConditions();
   }
 
   private void updateStats(HashTable htable) {
@@ -523,16 +506,16 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       return;
     }
     htable.getStats(htStats);
-    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
   }
 
   @Override
   public void killIncoming(boolean sendUpstream) {
-    this.left.kill(sendUpstream);
-    this.right.kill(sendUpstream);
+    left.kill(sendUpstream);
+    right.kill(sendUpstream);
   }
 
   @Override
@@ -551,5 +534,4 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
     super.close();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2d37fa5..8f2dad3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -39,9 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.server.options.DrillConfigIterator.Iter;
 import org.apache.drill.exec.vector.AllocationHelper;
-
 import com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -183,7 +181,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
     outputRecords = nljWorker.outputRecords();
 
     // Set the record count
-    for (VectorWrapper vw : container) {
+    for (final VectorWrapper<?> vw : container) {
       vw.getValueVector().getMutator().setValueCount(outputRecords);
     }
 
@@ -202,7 +200,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
     }
     right.kill(true);
     while (hasMore(rightUpstream)) {
-      for (VectorWrapper<?> wrapper : right) {
+      for (final VectorWrapper<?> wrapper : right) {
         wrapper.getValueVector().clear();
       }
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
@@ -280,7 +278,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
    * Simple method to allocate space for all the vectors in the container.
    */
   private void allocateVectors() {
-    for (VectorWrapper vw : container) {
+    for (final VectorWrapper<?> vw : container) {
       AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
     }
   }
@@ -309,7 +307,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
 
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
-        for (VectorWrapper vw : left) {
+        for (final VectorWrapper<?> vw : left) {
           container.addOrGet(vw.getField());
         }
 
@@ -321,7 +319,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
 
       if (rightUpstream != IterOutcome.NONE) {
         rightSchema = right.getSchema();
-        for (VectorWrapper vw : right) {
+        for (final VectorWrapper<?> vw : right) {
           container.addOrGet(vw.getField());
         }
         addBatchToHyperContainer(right);
@@ -341,7 +339,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
   }
 
   private void addBatchToHyperContainer(RecordBatch inputBatch) {
-    RecordBatchData batchCopy = new RecordBatchData(inputBatch);
+    final RecordBatchData batchCopy = new RecordBatchData(inputBatch);
     boolean success = false;
     try {
       rightCounts.addLast(inputBatch.getRecordCount());

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 4ea5a5c..06dd699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -33,19 +33,19 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import com.google.common.collect.Lists;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
   private int recordsToSkip;
   private int recordsLeft;
-  private boolean noEndLimit;
+  private final boolean noEndLimit;
   private boolean skipBatch;
   private boolean first = true;
-  List<TransferPair> transfers = Lists.newArrayList();
+  private final List<TransferPair> transfers = Lists.newArrayList();
 
-  public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming)
+      throws OutOfMemoryException {
     super(popConfig, context, incoming);
     outgoingSv = new SelectionVector2(oContext.getAllocator());
     recordsToSkip = popConfig.getFirst();
@@ -62,14 +62,15 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     transfers.clear();
 
 
-    for(VectorWrapper<?> v : incoming){
-      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
+    for(final VectorWrapper<?> v : incoming) {
+      final TransferPair pair = v.getValueVector().makeTransferPair(
+          container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
     }
 
-    BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+    final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
 
-    switch(svMode){
+    switch(svMode) {
       case NONE:
         break;
       case TWO_BYTE:
@@ -98,7 +99,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       }
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-
         // Clear the memory for the incoming batch
         for (VectorWrapper<?> wrapper : incoming) {
           wrapper.getValueVector().clear();
@@ -126,15 +126,15 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       first = false;
     }
     skipBatch = false;
-    int recordCount = incoming.getRecordCount();
+    final int recordCount = incoming.getRecordCount();
     if (recordCount == 0) {
       skipBatch = true;
       return IterOutcome.OK;
     }
-    for(TransferPair tp : transfers) {
+    for(final TransferPair tp : transfers) {
       tp.transfer();
     }
-    if(recordCount <= recordsToSkip) {
+    if (recordCount <= recordsToSkip) {
       recordsToSkip -= recordCount;
       skipBatch = true;
     } else {
@@ -149,8 +149,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     return IterOutcome.OK;
   }
 
+  // These two functions are identical except for the computation of the index; merge
   private void limitWithNoSV(int recordCount) {
-    int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+    final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
     recordsToSkip -= offset;
     int fetch;
 
@@ -162,15 +163,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     }
 
     int svIndex = 0;
-    for(char i = (char) offset; i < fetch; i++) {
+    for(char i = (char) offset; i < fetch; svIndex++, i++) {
       outgoingSv.setIndex(svIndex, i);
-      svIndex++;
     }
     outgoingSv.setRecordCount(svIndex);
   }
 
   private void limitWithSV(int recordCount) {
-    int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+    final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
     recordsToSkip -= offset;
     int fetch;
 
@@ -182,10 +182,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     }
 
     int svIndex = 0;
-    for(int i = offset; i < fetch; i++) {
-      char index = incomingSv.getIndex(i);
+    for(int i = offset; i < fetch; svIndex++, i++) {
+      final char index = incomingSv.getIndex(i);
       outgoingSv.setIndex(svIndex, index);
-      svIndex++;
     }
     outgoingSv.setRecordCount(svIndex);
   }
@@ -196,9 +195,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  public void close(){
+  public void close() {
     outgoingSv.clear();
     super.close();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 0050b45..3061f99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -159,8 +159,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   @Override
   public void close() {
     super.close();
-    this.partitionVectors.clear();
-    this.partitionKeyVector.clear();
+    partitionVectors.clear();
+    partitionKeyVector.clear();
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 31fc160..6e49e78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -58,8 +58,7 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 
 public class PartitionSenderRootExec extends BaseRootExec {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private RecordBatch incoming;
   private HashPartitionSender operator;
   private PartitionerDecorator partitioner;
@@ -105,10 +104,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
-    this.outGoingBatchCount = operator.getDestinations().size();
-    this.popConfig = operator;
-    this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
-    this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+    outGoingBatchCount = operator.getDestinations().size();
+    popConfig = operator;
+    remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
+    remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
     stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
     // Algorithm to figure out number of threads to parallelize output
     // numberOfRows/sliceTarget/numReceivers/threadfactor
@@ -137,7 +136,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   @Override
   public boolean innerNext() {
-
     if (!ok) {
       return false;
     }
@@ -332,6 +330,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
+  @Override
   public void close() throws Exception {
     logger.debug("Partition sender stopping.");
     super.close();
@@ -340,7 +339,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
       updateAggregateStats();
       partitioner.clear();
     }
-
   }
 
   public void sendEmptyBatch(boolean isLast) {

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index b9a1641..98ee320 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -108,7 +108,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       return false;
     } else {
       container.clear();
-      for (final VectorWrapper w : newContainer) {
+      for (final VectorWrapper<?> w : newContainer) {
         container.add(w.getValueVector());
       }
       container.buildSchema(SelectionVectorMode.NONE);
@@ -118,7 +118,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   private class Producer implements Runnable {
-
     RecordBatchDataWrapper wrapper;
 
     @Override
@@ -206,7 +205,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       cleanUpLatch.await();
     } catch (final InterruptedException e) {
       logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
-      // TODO InterruptedException
+      // TODO we should retry to wait for the latch
     } finally {
       super.close();
       clearQueue();

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 407f05d..cebefa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -92,9 +92,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     if (schema != null) {
       if (getSelectionVector4().next()) {
         return IterOutcome.OK;
-      } else {
-        return IterOutcome.NONE;
       }
+
+      return IterOutcome.NONE;
     }
 
     try{

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index b5b1b0a..cb04244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -185,7 +185,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  public void close(){
+  public void close() {
     super.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index f1da1db..701ead5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -171,7 +171,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   @Override
   protected void buildSchema() throws SchemaChangeException {
     logger.trace("buildSchema()");
-    IterOutcome outcome = next(incoming);
+    final IterOutcome outcome = next(incoming);
     switch (outcome) {
       case NONE:
         state = BatchState.DONE;
@@ -208,7 +208,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     container.clear();
 
     // all existing vectors will be transferred to the outgoing container in framer.doWork()
-    for (VectorWrapper wrapper : batch) {
+    for (final VectorWrapper<?> wrapper : batch) {
       container.addOrGet(wrapper.getField());
     }
 
@@ -292,10 +292,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       cg.setMappingSet(rightMapping);
       ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
 
-      LogicalExpression fh =
+      final LogicalExpression fh =
         FunctionGenerationHelper
           .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
-      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+      final ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
       cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
     cg.getEvalBlock()._return(JExpr.TRUE);

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index ed32f43..31deada 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -134,7 +134,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
     dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
     copierAllocator = oContext.getAllocator().getChildAllocator(
-        context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
+        context, PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION, true);
     FragmentHandle handle = context.getHandle();
     fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
         handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
@@ -188,7 +188,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         sv4.clear();
       }
       if (copier != null) {
-        copier.cleanup();
+        copier.close();
       }
       copierAllocator.close();
       super.close();
@@ -707,7 +707,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         g.setMappingSet(MAIN_MAPPING);
         copier = context.getImplementationClass(cg);
       } else {
-        copier.cleanup();
+        copier.close();
       }
 
       BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator();

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 37529ff..d42e8d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -37,14 +37,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Queues;
 
-public abstract class MSortTemplate implements MSorter, IndexedSortable{
+public abstract class MSortTemplate implements MSorter, IndexedSortable {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
   private long compares;
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
-  private Queue<Integer> newRunStarts;
   private FragmentContext context;
 
   /**
@@ -67,7 +66,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       final int newBatch = this.vector4.get(i) >>> 16;
       if (newBatch == batch) {
         continue;
-      } else if(newBatch == batch + 1) {
+      } else if (newBatch == batch + 1) {
         runStarts.add(i);
         batch = newBatch;
       } else {
@@ -135,7 +134,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       }
 
       int outIndex = 0;
-      newRunStarts = Queues.newLinkedBlockingQueue();
+      final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
       newRunStarts.add(outIndex);
       final int size = runStarts.size();
       for (int i = 0; i < size / 2; i++) {
@@ -155,9 +154,9 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       }
       final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount);
       aux.clear();
-      aux = this.vector4.createNewWrapperCurrent(desiredRecordBatchCount);
+      aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount);
       vector4.clear();
-      this.vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount);
+      vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount);
       tmp.clear();
       runStarts = newRunStarts;
     }
@@ -198,5 +197,4 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index 161ca6a..e0d9c2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -25,15 +25,18 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 
-public interface PriorityQueueCopier {
-  public static long initialAllocation = 10000000;
-  public static long maxAllocation = 20000000;
+public interface PriorityQueueCopier extends AutoCloseable {
+  public static final long INITIAL_ALLOCATION = 10000000;
+  public static final long MAX_ALLOCATION = 20000000;
+
+  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch,
+      List<BatchGroup> batchGroups, VectorAccessible outgoing) throws SchemaChangeException;
 
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
-                    VectorAccessible outgoing) throws SchemaChangeException;
   public int next(int targetRecordCount);
-  public void cleanup();
 
-  public static TemplateClassDefinition<PriorityQueueCopier> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueueCopier>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
+  public final static TemplateClassDefinition<PriorityQueueCopier> TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
 
+  @Override
+  abstract public void close(); // specify this to leave out the Exception
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index facf192..891907a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -66,7 +66,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     allocateVectors(targetRecordCount);
     for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
       if (queueSize == 0) {
-        cleanup();
+        close();
         return 0;
       }
       int compoundIndex = vector4.get(0);
@@ -96,12 +96,12 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     vector4.clear();
-    for (VectorWrapper w: outgoing) {
+    for (final VectorWrapper<?> w: outgoing) {
       w.getValueVector().clear();
     }
-    for (VectorWrapper w : hyperBatch) {
+    for (final VectorWrapper<?> w : hyperBatch) {
       w.clear();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 8731739..d8f703e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
   protected final VectorContainer container;
   protected final T popConfig;
@@ -51,8 +51,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
-      final OperatorContext oContext) throws OutOfMemoryException {
-    super();
+      final OperatorContext oContext) {
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
@@ -119,6 +118,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return next;
   }
 
+  @Override
   public final IterOutcome next() {
     try {
       stats.startProcessing();
@@ -174,11 +174,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   protected abstract void killIncoming(boolean sendUpstream);
 
-  public void close(){
+  @Override
+  public void close() {
     container.clear();
   }
 
-
   @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();
@@ -199,7 +199,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return container.getValueAccessorById(clazz, ids);
   }
 
-
   @Override
   public WritableBatch getWritableBatch() {
 //    logger.debug("Getting writable batch.");
@@ -212,5 +211,4 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   public VectorContainer getOutgoingContainer() {
     throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index dd90cab..e84057b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -25,7 +25,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 
 public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
   protected final RecordBatch incoming;
   protected boolean outOfMemory = false;
@@ -51,7 +51,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
     IterOutcome upstream = next(incoming);
     if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
       do {
-        for (VectorWrapper w : incoming) {
+        for (final VectorWrapper<?> w : incoming) {
           w.clear();
         }
       } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
@@ -118,9 +118,9 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   public BatchSchema getSchema() {
     if (container.hasSchema()) {
       return container.getSchema();
-    } else {
-      return null;
     }
+
+    return null;
   }
 
   protected abstract boolean setupNewSchema() throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index f2f9450..732129a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -25,21 +25,19 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.rpc.data.AckSender;
 
 public class RawFragmentBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
 
   private final FragmentRecordBatch header;
   private final DrillBuf body;
   private final AckSender sender;
-
-  private AtomicBoolean ackSent = new AtomicBoolean(false);
+  private final AtomicBoolean ackSent = new AtomicBoolean(false);
 
   public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
-    super();
     this.header = header;
-    this.body = body;
     this.sender = sender;
+    this.body = body;
     if (body != null) {
-      body.retain();
+      body.retain(1);
     }
   }
 
@@ -58,11 +56,10 @@ public class RawFragmentBatch {
 
   public void release() {
     if (body != null) {
-      body.release();
+      body.release(1);
     }
   }
 
-
   public AckSender getSender() {
     return sender;
   }
@@ -80,5 +77,4 @@ public class RawFragmentBatch {
   public boolean isAckSent() {
     return ackSent.get();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 8e3b9e5..55ae309 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.StackTrace;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -34,9 +35,11 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{
   private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
 
@@ -63,14 +66,14 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
   public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException {
     if (logger.isTraceEnabled()) {
       logger.trace("Loading record batch with def {} and data {}", def, buf);
-      logger.trace("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only"));
+      logger.trace("Load, ThreadID: {}\n{}", Thread.currentThread().getId(), new StackTrace());
     }
     container.zeroVectors();
     valueCount = def.getRecordCount();
     boolean schemaChanged = schema == null;
 
     final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
-    for (final VectorWrapper wrapper : container) {
+    for(final VectorWrapper<?> wrapper : container) {
       final ValueVector vector = wrapper.getValueVector();
       oldFields.put(vector.getField(), vector);
     }
@@ -79,7 +82,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     try {
       final List<SerializedField> fields = def.getFieldList();
       int bufOffset = 0;
-      for (final SerializedField field : fields) {
+      for(final SerializedField field : fields) {
         final MaterializedField fieldDef = MaterializedField.create(field);
         ValueVector vector = oldFields.remove(fieldDef);
 
@@ -106,7 +109,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 
       // rebuild the schema.
       final SchemaBuilder builder = BatchSchema.newBuilder();
-      for (VectorWrapper<?> v : newVectors) {
+      for (final VectorWrapper<?> v : newVectors) {
         builder.addField(v.getField());
       }
       builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -116,7 +119,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     } catch (final Throwable cause) {
       // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
       // adjudicate to call upper layer specific clean up logic.
-      for (final VectorWrapper wrapper:newVectors) {
+      for (final VectorWrapper<?> wrapper:newVectors) {
         wrapper.getValueVector().clear();
       }
       throw cause;
@@ -132,12 +135,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     return schemaChanged;
   }
 
+  @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
     return container.getValueVectorId(path);
   }
 
-
-
 //
 //  @SuppressWarnings("unchecked")
 //  public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) {
@@ -152,10 +154,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 //    return (T) v;
 //  }
 
+  @Override
   public int getRecordCount() {
     return valueCount;
   }
 
+  @Override
   public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
     return container.getValueAccessorById(clazz, ids);
   }
@@ -170,11 +174,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     return this.container.iterator();
   }
 
-  public BatchSchema getSchema(){
+  @Override
+  public BatchSchema getSchema() {
     return schema;
   }
 
-  public void clear(){
+  public void clear() {
     container.clear();
   }
 
@@ -184,7 +189,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 
     // rebuild the schema.
     SchemaBuilder b = BatchSchema.newBuilder();
-    for(VectorWrapper<?> v : container){
+    for(final VectorWrapper<?> v : container){
       b.addField(v.getField());
     }
     b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);

http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 6e27628..8ca3ec8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -80,7 +80,7 @@ public abstract class AbstractRecordReader implements RecordReader {
 
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
-    for (ValueVector v : vectorMap.values()) {
+    for (final ValueVector v : vectorMap.values()) {
       v.allocateNew();
     }
   }