You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/09/12 17:17:41 UTC

[32/37] DRILL-1402: Add check-style rules for trailing space, TABs and blocks without braces

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index 26d881d..f6e11c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -63,8 +63,9 @@ public class SingleMergeExchange extends AbstractExchange {
   protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations)
       throws PhysicalOperatorSetupException {
 
-    if (receiverLocations.size() != 1)
+    if (receiverLocations.size() != 1) {
       throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint");
+    }
     receiverLocation = receiverLocations.iterator().next();
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index cafdbdd..bf2b4a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -48,7 +48,9 @@ public class UnionExchange extends AbstractExchange{
 
   @Override
   protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
-    if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+    if (receiverLocations.size() != 1) {
+      throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+    }
     this.destinationLocation = receiverLocations.iterator().next();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 7f97624..e25f1c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -41,9 +41,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
 
   private RootExec root = null;
 
-  private ImplCreator(){}
+  private ImplCreator() {}
 
-  private RootExec getRoot(){
+  private RootExec getRoot() {
     return root;
   }
 
@@ -78,7 +78,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
 
   public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     ImplCreator i = new ImplCreator();
-    if(AssertionUtil.isAssertionsEnabled()){
+    if (AssertionUtil.isAssertionsEnabled()) {
       root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
     }
 
@@ -86,9 +86,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     watch.start();
     root.accept(i, context);
     logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
-    if (i.root == null)
+    if (i.root == null) {
       throw new ExecutionSetupException(
           "The provided fragment did not have a root node that correctly created a RootExec value.");
+    }
     return i.getRoot();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
index 8c768e5..82a9a63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
@@ -42,7 +42,9 @@ public class OperatorCreatorRegistry {
 
   public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException {
     Object opCreator = instanceRegistry.get(operator);
-    if (opCreator != null) return opCreator;
+    if (opCreator != null) {
+      return opCreator;
+    }
 
     Constructor<?> c = constructorRegistry.get(operator);
     if(c == null) {
@@ -75,9 +77,9 @@ public class OperatorCreatorRegistry {
           Type[] args = ((ParameterizedType)iface).getActualTypeArguments();
           interfaceFound = true;
           boolean constructorFound = false;
-          for(Constructor<?> constructor : operatorClass.getConstructors()){
+          for (Constructor<?> constructor : operatorClass.getConstructors()) {
             Class<?>[] params = constructor.getParameterTypes();
-            if(params.length == 0){
+            if (params.length == 0) {
               Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor);
               if (old != null) {
                 throw new RuntimeException(
@@ -88,7 +90,7 @@ public class OperatorCreatorRegistry {
               constructorFound = true;
             }
           }
-          if(!constructorFound){
+          if (!constructorFound) {
             logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor",
                 operatorClass.getCanonicalName());
           }
@@ -97,4 +99,5 @@ public class OperatorCreatorRegistry {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index c2a03b9..2712e27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,8 +83,9 @@ public class ScanBatch implements RecordBatch {
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
     this.readers = readers;
-    if (!readers.hasNext())
+    if (!readers.hasNext()) {
       throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+    }
     this.currentReader = readers.next();
     this.oContext = new OperatorContext(subScanConfig, context);
     this.currentReader.setOperatorContext(this.oContext);
@@ -121,7 +122,7 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public void kill(boolean sendUpstream) {
-    if(currentReader != null){
+    if (currentReader != null) {
       currentReader.cleanup();
     }
 
@@ -220,8 +221,8 @@ public class ScanBatch implements RecordBatch {
 
   private void addPartitionVectors() throws ExecutionSetupException{
     try {
-      if(partitionVectors != null){
-        for(ValueVector v : partitionVectors){
+      if (partitionVectors != null) {
+        for (ValueVector v : partitionVectors) {
           v.clear();
         }
       }
@@ -290,7 +291,9 @@ public class ScanBatch implements RecordBatch {
       if (v == null || v.getClass() != clazz) {
         // Field does not exist add it to the map and the output container
         v = TypeHelper.getNewVector(field, oContext.getAllocator());
-        if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+        if (!clazz.isAssignableFrom(v.getClass())) {
+          throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+        }
         container.add(v);
         fieldVectorMap.put(field.key(), v);
 
@@ -342,9 +345,9 @@ public class ScanBatch implements RecordBatch {
     return WritableBatch.get(this);
   }
 
-  public void cleanup(){
+  public void cleanup() {
     container.clear();
-    for(ValueVector v : partitionVectors){
+    for (ValueVector v : partitionVectors) {
       v.clear();
     }
     fieldVectorMap.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2b7fdf3..352deae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -79,7 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     @Override
     public boolean innerNext() {
-      if(!ok){
+      if (!ok) {
         incoming.kill(false);
 
         return false;
@@ -93,7 +93,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         out = IterOutcome.NONE;
       }
 //      logger.debug("Outcome of sender next {}", out);
-      switch(out){
+      switch (out) {
       case STOP:
       case NONE:
         FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
@@ -158,7 +158,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       @Override
       public void success(Ack value, ByteBuf buf) {
         sendCount.decrement();
-        if(value.getOk()) return;
+        if (value.getOk()) {
+          return;
+        }
 
         logger.error("Downstream fragment was not accepted.  Stopping future sends.");
         // if we didn't get ack ok, we'll need to kill the query.
@@ -170,5 +172,4 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 6eede30..473e3a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -132,10 +132,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   @Override
   public IterOutcome innerNext() {
-    if(schema != null){
-      if(getSelectionVector4().next()){
+    if (schema != null) {
+      if (getSelectionVector4().next()) {
         return IterOutcome.OK;
-      }else{
+      } else {
         return IterOutcome.NONE;
       }
     }
@@ -156,8 +156,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           return upstream;
         case OK_NEW_SCHEMA:
           // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
-          if(!incoming.getSchema().equals(schema)){
-            if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+          if (!incoming.getSchema().equals(schema)) {
+            if (schema != null) {
+              throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+            }
             this.schema = incoming.getSchema();
           }
           // fall through.
@@ -181,7 +183,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         }
       }
 
-      if (schema == null){
+      if (schema == null) {
         // builder may be null at this point if the first incoming batch is empty
         return IterOutcome.NONE;
       }
@@ -196,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
       return IterOutcome.OK_NEW_SCHEMA;
 
-    }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+    } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
       kill(false);
       logger.error("Failure during query", ex);
       context.fail(ex);
@@ -215,7 +217,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     if (copier == null) {
       copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch);
     } else {
-      for(VectorWrapper<?> i : batch){
+      for (VectorWrapper<?> i : batch) {
 
         ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
         newContainer.add(v);
@@ -227,7 +229,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       int count = selectionVector4.getCount();
       int copiedRecords = copier.copyRecords(0, count);
       assert copiedRecords == count;
-      for(VectorWrapper<?> v : newContainer){
+      for (VectorWrapper<?> v : newContainer) {
         ValueVector.Mutator m = v.getValueVector().getMutator();
         m.setValueCount(count);
       }
@@ -253,11 +255,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     ClassGenerator<PriorityQueue> g = cg.getRoot();
     g.setMappingSet(mainMapping);
 
-    for(Ordering od : orderings){
+    for (Ordering od : orderings) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
-      if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      if (collector.hasErrors()) {
+        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      }
       g.setMappingSet(leftMapping);
       HoldingContainer left = g.addExpr(expr, false);
       g.setMappingSet(rightMapping);
@@ -269,9 +273,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       HoldingContainer out = g.addExpr(fh, false);
       JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 
-      if(od.getDirection() == Direction.ASCENDING){
+      if (od.getDirection() == Direction.ASCENDING) {
         jc._then()._return(out.getValue());
-      }else{
+      } else {
         jc._then()._return(out.getValue().minus());
       }
       g.rotateBlock();
@@ -377,5 +381,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 58dd247..92d1882 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -82,10 +82,12 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
         }
 
         /* Inject trace operator */
-        if (list.size() > 0)
-            newOp = op.getNewWithChildren(list);
-            newOp.setOperatorId(op.getOperatorId());
+        if (list.size() > 0) {
+          newOp = op.getNewWithChildren(list);
+        }
+        newOp.setOperatorId(op.getOperatorId());
 
         return newOp;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 99eeed3..8c1a4c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -82,8 +82,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     IterOutcome upstream;
     do {
       upstream = next(incoming);
-      if(first && upstream == IterOutcome.OK)
+      if(first && upstream == IterOutcome.OK) {
         upstream = IterOutcome.OK_NEW_SCHEMA;
+      }
       first = false;
 
       switch(upstream) {
@@ -91,14 +92,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
         case NONE:
         case STOP:
           cleanup();
-          if (upstream == IterOutcome.STOP)
+          if (upstream == IterOutcome.STOP) {
             return upstream;
+          }
           break;
 
         case OK_NEW_SCHEMA:
           try{
             setupNewSchema();
-          }catch(Exception ex){
+          } catch(Exception ex) {
             kill(false);
             logger.error("Failure during query", ex);
             context.fail(ex);
@@ -113,9 +115,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
             throw new RuntimeException(ex);
           }
 
-          for(VectorWrapper v : incoming)
+          for(VectorWrapper v : incoming) {
             v.getValueVector().clear();
-
+          }
           break;
 
         default:
@@ -176,4 +178,5 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
       throw new RuntimeException("Failed to close RecordWriter", ex);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index e9be2ac..c522870 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -82,7 +82,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   public int getRecordCount() {
-    if(done) return 0;
+    if (done) {
+      return 0;
+    }
     return aggregator.getOutputCount();
   }
 
@@ -102,7 +104,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       case STOP:
         return outcome;
       case OK_NEW_SCHEMA:
-        if (!createAggregator()){
+        if (!createAggregator()) {
           done = true;
           return IterOutcome.STOP;
         }
@@ -131,10 +133,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
 
-    while(true){
+    while (true) {
       AggOutcome out = aggregator.doWork();
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
+      switch (out) {
       case CLEANUP_AND_RETURN:
         container.zeroVectors();
         aggregator.cleanup();
@@ -150,7 +152,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
         aggregator = null;
-        if(!createAggregator()){
+        if (!createAggregator()) {
           return IterOutcome.STOP;
         }
         continue;
@@ -168,23 +170,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
-    try{
+    try {
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
       return true;
-    }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+    } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
       context.fail(ex);
       container.clear();
       incoming.kill(false);
       return false;
-    }finally{
+    } finally {
       stats.stopSetup();
     }
   }
 
   private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
-  	CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-  	ClassGenerator<HashAggregator> cg = top.getRoot();
+    CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    ClassGenerator<HashAggregator> cg = top.getRoot();
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
 
     container.clear();
@@ -199,10 +201,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
     int i;
 
-    for(i = 0; i < numGroupByExprs; i++) {
+    for (i = 0; i < numGroupByExprs; i++) {
       NamedExpression ne = popConfig.getGroupByExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
-      if(expr == null) continue;
+      if (expr == null) {
+        continue;
+      }
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -211,13 +215,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       groupByOutFieldIds[i] = container.add(vv);
     }
 
-    for(i = 0; i < numAggrExprs; i++){
+    for (i = 0; i < numAggrExprs; i++) {
       NamedExpression ne = popConfig.getAggrExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
 
-      if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      if (collector.hasErrors()) {
+        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      }
 
-      if(expr == null) continue;
+      if (expr == null) {
+        continue;
+      }
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -248,7 +256,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     return agg;
   }
 
-
   private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) {
     cg.setMappingSet(UpdateAggrValuesMapping);
 
@@ -260,8 +267,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
-  private void setupGetIndex(ClassGenerator<HashAggregator> cg){
-    switch(incoming.getSchema().getSelectionVectorMode()){
+  private void setupGetIndex(ClassGenerator<HashAggregator> cg) {
+    switch (incoming.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE: {
       JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class));
       cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index b6b8874..d25a952 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -156,7 +156,9 @@ public abstract class HashAggTemplate implements HashAggregator {
       boolean status = true;
       for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, batchOutputCount) ) {
-          if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+          if (EXTRA_DEBUG_2) {
+            logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+          }
           batchOutputCount++;
           outNumRecordsHolder.value++;
         } else {
@@ -270,31 +272,41 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       outside: while(true) {
         // loop through existing records, aggregating the values as necessary.
-        if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
+        if (EXTRA_DEBUG_1) {
+          logger.debug ("Starting outer loop of doWork()...");
+        }
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          if(EXTRA_DEBUG_2) {
+            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          }
           boolean success = checkGroupAndAggrValues(currentIndex);
           assert success : "HashAgg couldn't copy values.";
         }
 
-        if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
+        if (EXTRA_DEBUG_1) {
+          logger.debug("Processed {} records", underlyingIndex);
+        }
 
-        try{
+        try {
 
-          while(true){
+          while (true) {
             // Cleanup the previous batch since we are done processing it.
             for (VectorWrapper<?> v : incoming) {
               v.getValueVector().clear();
             }
             IterOutcome out = outgoing.next(0, incoming);
-            if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
-            switch(out){
+            if (EXTRA_DEBUG_1) {
+              logger.debug("Received IterOutcome of {}", out);
+            }
+            switch (out) {
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
 
             case OK_NEW_SCHEMA:
-              if(EXTRA_DEBUG_1) logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
+              if (EXTRA_DEBUG_1) {
+                logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
+              }
               newSchema = true;
               this.cleanup();
               // TODO: new schema case needs to be handled appropriately
@@ -302,14 +314,16 @@ public abstract class HashAggTemplate implements HashAggregator {
 
             case OK:
               resetIndex();
-              if(incoming.getRecordCount() == 0){
+              if (incoming.getRecordCount() == 0) {
                 continue;
               } else {
                 boolean success = checkGroupAndAggrValues(currentIndex);
                 assert success : "HashAgg couldn't copy values.";
                 incIndex();
 
-                if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
+                if (EXTRA_DEBUG_1) {
+                  logger.debug("Continuing outside loop");
+                }
                 continue outside;
               }
 
@@ -343,8 +357,10 @@ public abstract class HashAggTemplate implements HashAggregator {
           // placeholder...
         }
       }
-    } finally{
-      if(first) first = !first;
+    } finally {
+      if (first) {
+        first = !first;
+      }
     }
   }
 
@@ -373,7 +389,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   @Override
-  public void cleanup(){
+  public void cleanup() {
     if (htable != null) {
       htable.clear();
       htable = null;
@@ -392,28 +408,28 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
-  private final AggOutcome setOkAndReturn(){
-    if(first){
+  private final AggOutcome setOkAndReturn() {
+    if (first) {
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
-    }else{
+    } else {
       this.outcome = IterOutcome.OK;
     }
-    for(VectorWrapper<?> v : outgoing){
+    for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
     return AggOutcome.RETURN_OUTCOME;
   }
 
-  private final void incIndex(){
+  private final void incIndex() {
     underlyingIndex++;
-    if(underlyingIndex >= incoming.getRecordCount()){
+    if (underlyingIndex >= incoming.getRecordCount()) {
       currentIndex = Integer.MAX_VALUE;
       return;
     }
     currentIndex = getVectorIndex(underlyingIndex);
   }
 
-  private final void resetIndex(){
+  private final void resetIndex() {
     underlyingIndex = -1;
     incIndex();
   }
@@ -422,7 +438,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     BatchHolder bh = new BatchHolder();
     batchHolders.add(bh);
 
-    if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+    if (EXTRA_DEBUG_1) {
+      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+    }
 
     bh.setup();
   }
@@ -465,9 +483,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       outputCount += numOutputRecords;
 
-      if(first){
+      if (first) {
         this.outcome = IterOutcome.OK_NEW_SCHEMA;
-      }else{
+      } else {
         this.outcome = IterOutcome.OK;
       }
 
@@ -486,14 +504,14 @@ public abstract class HashAggTemplate implements HashAggregator {
     } else {
       if (!outputKeysStatus) {
         logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);
-        for(VectorWrapper<?> v : outContainer) {
+        for (VectorWrapper<?> v : outContainer) {
           logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
         }
         context.fail(new Exception("Failed to output keys for current batch !"));
       }
       if (!outputValuesStatus) {
         logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
-        for(VectorWrapper<?> v : outContainer) {
+        for (VectorWrapper<?> v : outContainer) {
           logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
         }
         context.fail(new Exception("Failed to output values for current batch !"));
@@ -557,7 +575,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
 
       if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
-        if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values");
+        if (EXTRA_DEBUG_2) {
+          logger.debug("Group-by key already present in hash table, updating the aggregate values");
+        }
 
         // debugging
         //if (holder.value == 100018 || holder.value == 100021) {
@@ -566,7 +586,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       }
       else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
-        if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+        if (EXTRA_DEBUG_2) {
+          logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+        }
 
         // debugging
         // if (holder.value == 100018 || holder.value == 100021) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4277f23..238242b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -40,7 +40,7 @@ public interface HashAggregator {
 
   public static enum AggOutcome {
     RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
-	  }
+  }
 
   public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
                              OperatorStats stats, BufferAllocator allocator, RecordBatch incoming,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 3e6def1..e690060 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -34,8 +34,8 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
   private final SelectionVector2 sv2;
   private final SelectionVector4 sv4;
 
-  public InternalBatch(RecordBatch incoming){
-    switch(incoming.getSchema().getSelectionVectorMode()){
+  public InternalBatch(RecordBatch incoming) {
+    switch(incoming.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE:
       this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
       this.sv2 = null;
@@ -69,13 +69,17 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
     return container.iterator();
   }
 
-  public void clear(){
-    if(sv2 != null) sv2.clear();
-    if(sv4 != null) sv4.clear();
+  public void clear() {
+    if (sv2 != null) {
+      sv2.clear();
+    }
+    if (sv4 != null) {
+      sv4.clear();
+    }
     container.clear();
   }
 
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds) {
     return container.getValueAccessorById(clazz, fieldIds);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 820f722..ced5179 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,8 +67,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public int getRecordCount() {
-    if(done) return 0;
-    if (aggregator == null) return 0;
+    if (done) {
+      return 0;
+    }
+    if (aggregator == null) {
+      return 0;
+    }
     return aggregator.getOutputCount();
   }
 
@@ -88,7 +92,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       case STOP:
         return outcome;
       case OK_NEW_SCHEMA:
-        if (!createAggregator()){
+        if (!createAggregator()) {
           done = true;
           return IterOutcome.STOP;
         }
@@ -100,12 +104,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       }
     }
 
-    while(true){
+    while (true) {
       AggOutcome out = aggregator.doWork();
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
+      switch (out) {
       case CLEANUP_AND_RETURN:
-        if (!first) container.zeroVectors();
+        if (!first) {
+          container.zeroVectors();
+        }
         done = true;
         // fall through
       case RETURN_OUTCOME:
@@ -122,7 +128,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       case UPDATE_AGGREGATOR:
         first = false;
         aggregator = null;
-        if(!createAggregator()){
+        if (!createAggregator()) {
           return IterOutcome.STOP;
       }
       continue;
@@ -142,23 +148,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
-    try{
+    try {
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
       return true;
-    }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+    } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
       context.fail(ex);
       container.clear();
       incoming.kill(false);
       return false;
-    }finally{
+    } finally {
       stats.stopSetup();
     }
   }
 
-
-
-
   private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     container.clear();
@@ -169,20 +172,24 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     ErrorCollector collector = new ErrorCollectorImpl();
 
-    for(int i =0; i < keyExprs.length; i++){
+    for (int i =0; i < keyExprs.length; i++) {
       NamedExpression ne = popConfig.getKeys()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
-      if(expr == null) continue;
+      if (expr == null) {
+        continue;
+      }
       keyExprs[i] = expr;
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       keyOutputIds[i] = container.add(vector);
     }
 
-    for(int i =0; i < valueExprs.length; i++){
+    for (int i =0; i < valueExprs.length; i++) {
       NamedExpression ne = popConfig.getExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
-      if(expr == null) continue;
+      if (expr == null) {
+        continue;
+      }
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -190,7 +197,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
     }
 
-    if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+    }
 
     setupIsSame(cg, keyExprs);
     setupIsSameApart(cg, keyExprs);
@@ -207,15 +216,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     return agg;
   }
 
-
-
   private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
   private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
   private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
 
-  private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+  private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
     cg.setMappingSet(IS_SAME_I1);
-    for(LogicalExpression expr : keyExprs){
+    for (LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       cg.setMappingSet(IS_SAME_I1);
       HoldingContainer first = cg.addExpr(expr, false);
@@ -234,9 +241,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
   private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
 
-  private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+  private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
     cg.setMappingSet(ISA_B1);
-    for(LogicalExpression expr : keyExprs){
+    for (LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       cg.setMappingSet(ISA_B1);
       HoldingContainer first = cg.addExpr(expr, false);
@@ -254,9 +261,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
   private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
 
-  private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){
+  private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) {
     cg.setMappingSet(EVAL);
-    for(LogicalExpression ex : valueExprs){
+    for (LogicalExpression ex : valueExprs) {
       HoldingContainer hc = cg.addExpr(ex);
       cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
@@ -265,9 +272,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
 
-  private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+  private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
     cg.setMappingSet(RECORD_KEYS);
-    for(int i =0; i < keyExprs.length; i++){
+    for (int i =0; i < keyExprs.length; i++) {
       HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true));
       cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
@@ -280,10 +287,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null);
   private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
 
-  private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+  private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
     cg.setMappingSet(RECORD_KEYS_PREV);
 
-    for(int i =0; i < keyExprs.length; i++){
+    for (int i =0; i < keyExprs.length; i++) {
       // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same.  This is possible because InternalBatch guarantees this.
       logger.debug("Writing out expr {}", keyExprs[i]);
       cg.rotateBlock();
@@ -297,8 +304,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
-  private void getIndex(ClassGenerator<StreamingAggregator> g){
-    switch(incoming.getSchema().getSelectionVectorMode()){
+  private void getIndex(ClassGenerator<StreamingAggregator> g) {
+    switch (incoming.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE: {
       JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
       g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 53ac1ed..c2a5715 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -60,7 +60,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
 
   private void allocateOutgoing() {
-    for(VectorWrapper<?> w : outgoing){
+    for (VectorWrapper<?> w : outgoing) {
       w.getValueVector().allocateNew();
     }
   }
@@ -75,7 +75,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     return outputCount;
   }
 
-  private AggOutcome tooBigFailure(){
+  private AggOutcome tooBigFailure() {
     context.fail(new Exception(TOO_BIG_ERROR));
     this.outcome = IterOutcome.STOP;
     return AggOutcome.CLEANUP_AND_RETURN;
@@ -87,11 +87,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       outcome = IterOutcome.NONE;
       return AggOutcome.CLEANUP_AND_RETURN;
     }
-    try{ // outside loop to ensure that first is set to false after the first run.
+    try { // outside loop to ensure that first is set to false after the first run.
       outputCount = 0;
 
       // if we're in the first state, allocate outgoing.
-      if(first){
+      if (first) {
         allocateOutgoing();
       }
 
@@ -119,8 +119,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       }
 
       // pick up a remainder batch if we have one.
-      if(remainderBatch != null){
-        if (!outputToBatch( previousIndex )) return tooBigFailure();
+      if (remainderBatch != null) {
+        if (!outputToBatch( previousIndex )) {
+          return tooBigFailure();
+        }
         remainderBatch.clear();
         remainderBatch = null;
         return setOkAndReturn();
@@ -131,38 +133,56 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       if (pendingOutput) {
         allocateOutgoing();
         pendingOutput = false;
-        if(EXTRA_DEBUG) logger.debug("Attempting to output remainder.");
-        if (!outputToBatch( previousIndex)) return tooBigFailure();
+        if (EXTRA_DEBUG) {
+          logger.debug("Attempting to output remainder.");
+        }
+        if (!outputToBatch( previousIndex)) {
+          return tooBigFailure();
+        }
       }
 
-      if(newSchema){
+      if (newSchema) {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
 
-      if(lastOutcome != null){
+      if (lastOutcome != null) {
         outcome = lastOutcome;
         return AggOutcome.CLEANUP_AND_RETURN;
       }
 
-      outside: while(true){
+      outside: while(true) {
       // loop through existing records, adding as necessary.
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          if (EXTRA_DEBUG) {
+            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          }
           if (previousIndex == -1) {
-            if (EXTRA_DEBUG) logger.debug("Adding the initial row's keys and values.");
+            if (EXTRA_DEBUG) {
+              logger.debug("Adding the initial row's keys and values.");
+            }
             addRecordInc(currentIndex);
           }
           else if (isSame( previousIndex, currentIndex )) {
-            if(EXTRA_DEBUG) logger.debug("Values were found the same, adding.");
+            if (EXTRA_DEBUG) {
+              logger.debug("Values were found the same, adding.");
+            }
             addRecordInc(currentIndex);
           } else {
-            if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch.");
+            if (EXTRA_DEBUG) {
+              logger.debug("Values were different, outputting previous batch.");
+            }
             if (outputToBatch(previousIndex)) {
-              if(EXTRA_DEBUG) logger.debug("Output successful.");
+              if (EXTRA_DEBUG) {
+                logger.debug("Output successful.");
+              }
               addRecordInc(currentIndex);
             } else {
-              if(EXTRA_DEBUG) logger.debug("Output failed.");
-              if(outputCount == 0) return tooBigFailure();
+              if (EXTRA_DEBUG) {
+                logger.debug("Output failed.");
+              }
+              if (outputCount == 0) {
+                return tooBigFailure();
+              }
 
               // mark the pending output but move forward for the next cycle.
               pendingOutput = true;
@@ -178,23 +198,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
         InternalBatch previous = null;
 
-        try{
-          while(true){
+        try {
+          while (true) {
             if (previous != null) {
               previous.clear();
             }
             previous = new InternalBatch(incoming);
             IterOutcome out = outgoing.next(0, incoming);
-            if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
-            switch(out){
+            if (EXTRA_DEBUG) {
+              logger.debug("Received IterOutcome of {}", out);
+            }
+            switch (out) {
             case NONE:
               done = true;
               lastOutcome = out;
               if (first && addedRecordCount == 0) {
                 return setOkAndReturn();
-              } else if(addedRecordCount > 0){
-                if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
-                if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
+              } else if(addedRecordCount > 0) {
+                if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+                  remainderBatch = previous;
+                }
+                if (EXTRA_DEBUG) {
+                  logger.debug("Received no more batches, returning.");
+                }
                 return setOkAndReturn();
               }else{
                 if (first && out == IterOutcome.OK) {
@@ -204,17 +230,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 return AggOutcome.CLEANUP_AND_RETURN;
               }
 
-
-
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
 
             case OK_NEW_SCHEMA:
-              if(EXTRA_DEBUG) logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-              if(addedRecordCount > 0){
-                if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
-                if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning.");
+              if (EXTRA_DEBUG) {
+                logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
+              }
+              if (addedRecordCount > 0) {
+                if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+                  remainderBatch = previous;
+                }
+                if (EXTRA_DEBUG) {
+                  logger.debug("Wrote out end of previous batch, returning.");
+                }
                 newSchema = true;
                 return setOkAndReturn();
               }
@@ -222,21 +252,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               return AggOutcome.UPDATE_AGGREGATOR;
             case OK:
               resetIndex();
-              if(incoming.getRecordCount() == 0){
+              if (incoming.getRecordCount() == 0) {
                 continue;
-              }else{
-                if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){
-                  if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding.");
+              } else {
+                if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) {
+                  if (EXTRA_DEBUG) {
+                    logger.debug("New value was same as last value of previous batch, adding.");
+                  }
                   addRecordInc(currentIndex);
                   previousIndex = currentIndex;
                   incIndex();
-                  if(EXTRA_DEBUG) logger.debug("Continuing outside");
+                  if (EXTRA_DEBUG) {
+                    logger.debug("Continuing outside");
+                  }
                   continue outside;
-                }else{ // not the same
-                  if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside.");
+                } else { // not the same
+                  if (EXTRA_DEBUG) {
+                    logger.debug("This is not the same as the previous, add record and continue outside.");
+                  }
                   previousIndex = currentIndex;
-                  if(addedRecordCount > 0){
-                    if( !outputToBatchPrev( previous, previousIndex, outputCount) ){
+                  if (addedRecordCount > 0) {
+                    if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
                       remainderBatch = previous;
                       return setOkAndReturn();
                     }
@@ -251,72 +287,78 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               return AggOutcome.CLEANUP_AND_RETURN;
             }
 
-
         }
-        }finally{
+        } finally {
           // make sure to clear previous if we haven't saved it.
-          if(remainderBatch == null && previous != null){
+          if (remainderBatch == null && previous != null) {
             previous.clear();
           }
         }
       }
-    }finally{
-      if(first) first = !first;
+    } finally {
+      if (first) {
+        first = !first;
+      }
     }
 
   }
 
-
-  private final void incIndex(){
+  private final void incIndex() {
     underlyingIndex++;
-    if(underlyingIndex >= incoming.getRecordCount()){
+    if (underlyingIndex >= incoming.getRecordCount()) {
       currentIndex = Integer.MAX_VALUE;
       return;
     }
     currentIndex = getVectorIndex(underlyingIndex);
   }
 
-  private final void resetIndex(){
+  private final void resetIndex() {
     underlyingIndex = -1;
     incIndex();
   }
 
-  private final AggOutcome setOkAndReturn(){
-    if(first){
+  private final AggOutcome setOkAndReturn() {
+    if (first) {
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
-    }else{
+    } else {
       this.outcome = IterOutcome.OK;
     }
-    for(VectorWrapper<?> v : outgoing){
+    for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
     return AggOutcome.RETURN_OUTCOME;
   }
 
-  private final boolean outputToBatch(int inIndex){
+  private final boolean outputToBatch(int inIndex) {
 
-    if(!outputRecordKeys(inIndex, outputCount)){
-      if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount);
+    if (!outputRecordKeys(inIndex, outputCount)) {
+      if(EXTRA_DEBUG) {
+        logger.debug("Failure while outputting keys {}", outputCount);
+      }
       return false;
     }
 
-    if(!outputRecordValues(outputCount)){
-      if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount);
+    if (!outputRecordValues(outputCount)) {
+      if (EXTRA_DEBUG) {
+        logger.debug("Failure while outputting values {}", outputCount);
+      }
       return false;
     }
 
-    if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount);
+    if (EXTRA_DEBUG) {
+      logger.debug("{} values output successfully", outputCount);
+    }
     resetValues();
     outputCount++;
     addedRecordCount = 0;
     return true;
   }
 
-  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){
+  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
     boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
         && outputRecordValues(outIndex) //
         && resetValues();
-    if(success){
+    if (success) {
       resetValues();
       outputCount++;
       addedRecordCount = 0;
@@ -325,17 +367,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     return success;
   }
 
-  private void addRecordInc(int index){
+  private void addRecordInc(int index) {
     addRecord(index);
     this.addedRecordCount++;
   }
 
   @Override
-  public void cleanup(){
-    if(remainderBatch != null) remainderBatch.clear();
+  public void cleanup() {
+    if (remainderBatch != null) {
+      remainderBatch.clear();
+    }
   }
 
-
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 8f5f29b..96da00b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -28,8 +28,8 @@ public interface StreamingAggregator {
   public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
 
   public static enum AggOutcome {
-	    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
-	  }
+    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+  }
 
   public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 195d249..f77407e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -53,23 +53,23 @@ public class ChainedHashTable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
 
   private static final GeneratorMapping KEY_MATCH_BUILD =
-	  GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
+    GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
                             null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping KEY_MATCH_PROBE =
-	  GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
+    GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
                             null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping GET_HASH_BUILD =
-	  GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
+    GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
                             null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping GET_HASH_PROBE =
-	  GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
+    GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
                             null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping SET_VALUE =
-	  GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
+    GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
                             null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping OUTPUT_KEYS =
@@ -138,8 +138,12 @@ public class ChainedHashTable {
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
-      if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      if (expr == null) continue;
+      if (collector.hasErrors()) {
+        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      }
+      if (expr == null) {
+        continue;
+      }
       keyExprsBuild[i] = expr;
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
@@ -155,8 +159,12 @@ public class ChainedHashTable {
       i = 0;
       for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
         final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
-        if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-        if (expr == null) continue;
+        if (collector.hasErrors()) {
+          throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+        }
+        if (expr == null) {
+          continue;
+        }
         keyExprsProbe[i] = expr;
         i++;
       }
@@ -293,4 +301,3 @@ public class ChainedHashTable {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index b03880c..6024523 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -164,10 +164,11 @@ public abstract class HashTableTemplate implements HashTable {
       assert (currentIdxWithinBatch < HashTable.BATCH_SIZE);
       assert (incomingRowIdx < HashTable.BATCH_SIZE);
 
-      if (isProbe)
+      if (isProbe) {
         match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
-      else
+      } else {
         match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch);
+      }
 
       if (! match) {
         currentIdxHolder.value = links.getAccessor().get(currentIdxWithinBatch);
@@ -196,7 +197,9 @@ public abstract class HashTableTemplate implements HashTable {
 
       maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
 
-      if (EXTRA_DEBUG) logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+      if (EXTRA_DEBUG) {
+        logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+      }
 
       return true;
     }
@@ -225,7 +228,9 @@ public abstract class HashTableTemplate implements HashTable {
           newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
           newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
 
-          if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+          if (EXTRA_DEBUG) {
+            logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+          }
 
         } else {
           // follow the new table's hash chain until we encounter empty slot. Note that the hash chain could
@@ -245,7 +250,9 @@ public abstract class HashTableTemplate implements HashTable {
               newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
               newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
 
-              if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+              if (EXTRA_DEBUG) {
+                logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+              }
 
               break;
             } else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
@@ -253,7 +260,9 @@ public abstract class HashTableTemplate implements HashTable {
               newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain
               newHashValues.getMutator().setSafe(entryIdxWithinBatch,  hash);
 
-              if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+              if (EXTRA_DEBUG) {
+                logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+              }
 
               break;
             }
@@ -381,11 +390,19 @@ public abstract class HashTableTemplate implements HashTable {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
-    if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
-    if (initialCap <= 0) throw new IllegalArgumentException("The initial capacity must be greater than 0");
-    if (initialCap > MAXIMUM_CAPACITY) throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+    if (loadf <= 0 || Float.isNaN(loadf)) {
+      throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
+    }
+    if (initialCap <= 0) {
+      throw new IllegalArgumentException("The initial capacity must be greater than 0");
+    }
+    if (initialCap > MAXIMUM_CAPACITY) {
+      throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+    }
 
-    if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+    if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) {
+      throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+    }
 
     this.htConfig = htConfig;
     this.context = context;
@@ -397,8 +414,9 @@ public abstract class HashTableTemplate implements HashTable {
 
     // round up the initial capacity to nearest highest power of 2
     tableSize = roundUpToPowerOf2(initialCap);
-    if (tableSize > MAXIMUM_CAPACITY)
+    if (tableSize > MAXIMUM_CAPACITY) {
       tableSize = MAXIMUM_CAPACITY;
+    }
 
     threshold = (int) Math.ceil(tableSize * loadf);
 
@@ -500,7 +518,9 @@ public abstract class HashTableTemplate implements HashTable {
       currentIdx = freeIndex++;
       addBatchIfNeeded(currentIdx);
 
-      if (EXTRA_DEBUG) logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+      if (EXTRA_DEBUG) {
+        logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+      }
 
       if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
         // update the start index array
@@ -543,14 +563,16 @@ public abstract class HashTableTemplate implements HashTable {
       currentIdx = freeIndex++;
       addBatchIfNeeded(currentIdx);
 
-      if (EXTRA_DEBUG) logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+      if (EXTRA_DEBUG) {
+        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+      }
 
       if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
         htIdxHolder.value = currentIdx;
         return PutStatus.KEY_ADDED;
-      }
-      else
+      } else {
         return PutStatus.PUT_FAILED;
+      }
     }
 
     return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED ;
@@ -618,7 +640,9 @@ public abstract class HashTableTemplate implements HashTable {
 
     if (currentIdx >= totalBatchSize) {
       BatchHolder bh = addBatchHolder();
-      if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+      if (EXTRA_DEBUG) {
+        logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+      }
       return bh;
     }
     else {
@@ -638,12 +662,15 @@ public abstract class HashTableTemplate implements HashTable {
   // in the new table.. the metadata consists of the startIndices, links and hashValues.
   // Note that the keys stored in the BatchHolders are not moved around.
   private void resizeAndRehashIfNeeded() {
-    if (numEntries < threshold)
+    if (numEntries < threshold) {
       return;
+    }
 
     long t0 = System.currentTimeMillis();
 
-    if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+    if (EXTRA_DEBUG) {
+      logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+    }
 
     // If the table size is already MAXIMUM_CAPACITY, don't resize
     // the table, but set the threshold to Integer.MAX_VALUE such that
@@ -656,8 +683,9 @@ public abstract class HashTableTemplate implements HashTable {
     int newSize = 2 * tableSize;
 
     tableSize = roundUpToPowerOf2(newSize);
-    if (tableSize > MAXIMUM_CAPACITY)
+    if (tableSize > MAXIMUM_CAPACITY) {
       tableSize = MAXIMUM_CAPACITY;
+    }
 
     // set the new threshold based on the new table size and load factor
     threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
@@ -717,5 +745,3 @@ public abstract class HashTableTemplate implements HashTable {
   protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ;
 
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index bf00194..f1fcce0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -79,7 +79,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
   protected void doWork() {
     int recordCount = incoming.getRecordCount();
     filter.filterBatch(recordCount);
-//    for(VectorWrapper<?> v : container){
+//    for (VectorWrapper<?> v : container) {
 //      ValueVector.Mutator m = v.getValueVector().getMutator();
 //      m.setValueCount(recordCount);
 //    }
@@ -88,8 +88,12 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
   @Override
   public void cleanup() {
-    if(sv2 != null) sv2.clear();
-    if(sv4 != null) sv4.clear();
+    if (sv2 != null) {
+      sv2.clear();
+    }
+    if (sv4 != null) {
+      sv4.clear();
+    }
     super.cleanup();
   }
 
@@ -100,7 +104,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
       sv2.clear();
     }
 
-    switch(incoming.getSchema().getSelectionVectorMode()){
+    switch (incoming.getSchema().getSelectionVectorMode()) {
       case NONE:
         sv2 = new SelectionVector2(oContext.getAllocator());
         this.filter = generateSV2Filterer();
@@ -137,13 +141,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
 
     final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
-    if(collector.hasErrors()){
+    if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
 
     cg.addExpr(new ReturnValueExpression(expr));
 
-//    for(VectorWrapper<?> i : incoming){
+//    for (VectorWrapper<?> i : incoming) {
 //      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
 //      container.add(v);
 //      allocators.add(getAllocator4(v));
@@ -177,13 +181,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
 
     final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
-    if(collector.hasErrors()){
+    if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
 
     cg.addExpr(new ReturnValueExpression(expr));
 
-    for(VectorWrapper<?> v : incoming){
+    for (VectorWrapper<?> v : incoming) {
       TransferPair pair = v.getValueVector().getTransferPair();
       container.add(pair.getTo());
       transfers.add(pair);
@@ -202,5 +206,4 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 15044b8..2a08c05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -457,8 +457,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         return hj;
     }
 
-    private void allocateVectors(){
-      for(VectorWrapper<?> v : container){
+    private void allocateVectors() {
+      for(VectorWrapper<?> v : container) {
         v.getValueVector().allocateNew();
       }
     }
@@ -472,7 +472,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
 
     private void updateStats(HashTable htable) {
-      if(htable == null) return;
+      if (htable == null) {
+        return;
+      }
       htable.getStats(htStats);
       this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
       this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
@@ -488,7 +490,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
     @Override
     public void cleanup() {
-        if(hjHelper != null){
+        if (hjHelper != null) {
           hjHelper.clear();
         }
 
@@ -504,4 +506,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         left.cleanup();
         right.cleanup();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 785deae..133289e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -94,11 +94,13 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
     boolean success = true;
     while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
       success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
-      if(success){
+      if (success) {
         recordsProcessed++;
         outputRecords++;
-      }else{
-        if(outputRecords == 0) throw new IllegalStateException("Too big to fail.");
+      } else {
+        if (outputRecords == 0) {
+          throw new IllegalStateException("Too big to fail.");
+        }
         break;
       }
     }
@@ -166,11 +168,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
             boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
                 &&  projectProbeRecord(recordsProcessed, outputRecords);
-            if(!success){
+            if (!success) {
               // we failed to project.  redo this record.
               getNextRecord = false;
               return;
-            }else{
+            } else {
               outputRecords++;
 
               /* Projected single row from the build side with matching key but there
@@ -182,8 +184,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
                  * from the probe side. Drain the next row in the probe side.
                  */
                 recordsProcessed++;
-              }
-              else {
+              } else {
                 /* There is more than one row with the same key on the build side
                  * don't drain more records from the probe side till we have projected
                  * all the rows with this key
@@ -197,10 +198,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
             // If we have a left outer join, project the keys
             if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
               boolean success = projectProbeRecord(recordsProcessed, outputRecords);
-              if(!success){
-                if(outputRecords == 0){
+              if (!success) {
+                if (outputRecords == 0) {
                   throw new IllegalStateException("Record larger than single batch.");
-                }else{
+                } else {
                   // we've output some records but failed to output this one.  return and wait for next call.
                   return;
                 }
@@ -214,10 +215,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
         hjHelper.setRecordMatched(currentCompositeIdx);
         boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
             && projectProbeRecord(recordsProcessed, outputRecords);
-        if(!success){
-          if(outputRecords == 0){
+        if (!success) {
+          if (outputRecords == 0) {
             throw new IllegalStateException("Record larger than single batch.");
-          }else{
+          } else {
             // we've output some records but failed to output this one.  return and wait for next call.
             return;
           }
@@ -264,5 +265,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
                                @Named("outgoing") RecordBatch outgoing);
   public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+
   public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+
 }