You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/09/05 21:05:07 UTC

[2/3] drill git commit: DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 5afe66b..4d623cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,18 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.union;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -39,88 +36,96 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.resolver.TypeCastRules;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
 
-  private List<MaterializedField> outputFields;
+  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private UnionAllInput unionAllInput;
-  private RecordBatch current;
-
   private final List<TransferPair> transfers = Lists.newArrayList();
-  private List<ValueVector> allocationVectors;
-  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private List<ValueVector> allocationVectors = Lists.newArrayList();
   private int recordCount = 0;
-  private boolean schemaAvailable = false;
+  private UnionInputIterator unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
-    super(config, context, false);
-    assert (children.size() == 2) : "The number of the operands of Union must be 2";
-    unionAllInput = new UnionAllInput(this, children.get(0), children.get(1));
-  }
-
-  @Override
-  public int getRecordCount() {
-    return recordCount;
+    super(config, context, true, children.get(0), children.get(1));
   }
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
-    unionAllInput.getLeftRecordBatch().kill(sendUpstream);
-    unionAllInput.getRightRecordBatch().kill(sendUpstream);
+    left.kill(sendUpstream);
+    right.kill(sendUpstream);
   }
 
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
-  }
+  protected void buildSchema() throws SchemaChangeException {
+    if (! prefetchFirstBatchFromBothSides()) {
+      return;
+    }
 
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
+    unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right);
+
+    if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsOneSide(right.getSchema());
+    } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsOneSide((left.getSchema()));
+    } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    VectorAccessibleUtilities.allocateVectors(container, 0);
+    VectorAccessibleUtilities.setValueCount(container,0);
   }
 
   @Override
   public IterOutcome innerNext() {
     try {
-      IterOutcome upstream = unionAllInput.nextBatch();
-      logger.debug("Upstream of Union-All: {}", upstream);
-      switch (upstream) {
+      while (true) {
+        if (!unionInputIterator.hasNext()) {
+          return IterOutcome.NONE;
+        }
+
+        Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+        IterOutcome upstream = nextBatch.left;
+        RecordBatch incoming = nextBatch.right;
+
+        switch (upstream) {
         case NONE:
         case OUT_OF_MEMORY:
         case STOP:
           return upstream;
-
         case OK_NEW_SCHEMA:
-          outputFields = unionAllInput.getOutputFields();
+          return doWork(nextBatch.right, true);
         case OK:
-          IterOutcome workOutcome = doWork();
-
-          if (workOutcome != IterOutcome.OK) {
-            return workOutcome;
-          } else {
-            return upstream;
+          // skip batches with same schema as the previous one yet having 0 row.
+          if (incoming.getRecordCount() == 0) {
+            VectorAccessibleUtilities.clear(incoming);
+            continue;
           }
+          return doWork(nextBatch.right, false);
         default:
           throw new IllegalStateException(String.format("Unknown state %s.", upstream));
+        }
       }
     } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
       context.fail(ex);
@@ -130,120 +135,75 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
+  public int getRecordCount() {
+    return recordCount;
   }
 
-  private void setValueCount(int count) {
-    for (ValueVector v : allocationVectors) {
-      ValueVector.Mutator m = v.getMutator();
-      m.setValueCount(count);
-    }
-  }
-
-  private boolean doAlloc() {
-    for (ValueVector v : allocationVectors) {
-      try {
-        AllocationHelper.allocateNew(v, current.getRecordCount());
-      } catch (OutOfMemoryException ex) {
-        return false;
-      }
-    }
-    return true;
-  }
 
   @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException {
-    if (allocationVectors != null) {
-      for (ValueVector v : allocationVectors) {
-        v.clear();
-      }
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException {
+    Preconditions.checkArgument(inputBatch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
+        "Input batch and output batch have different field counthas!");
+
+    if (newSchema) {
+      createUnionAller(inputBatch);
     }
 
-    allocationVectors = Lists.newArrayList();
-    transfers.clear();
+    container.zeroVectors();
+    VectorUtil.allocateVectors(allocationVectors, inputBatch.getRecordCount());
+    recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+    VectorUtil.setValueCount(allocationVectors, recordCount);
 
-    // If both sides of Union-All are empty
-    if (unionAllInput.isBothSideEmpty()) {
-      for (MaterializedField materializedField : outputFields) {
-        final String colName = materializedField.getName();
-        final MajorType majorType = MajorType.newBuilder()
-            .setMinorType(MinorType.INT)
-            .setMode(DataMode.OPTIONAL)
-            .build();
-
-        MaterializedField outputField = MaterializedField.create(colName, majorType);
-        ValueVector vv = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vv);
-      }
-
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      return IterOutcome.OK;
     }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException {
+    transfers.clear();
+    allocationVectors.clear();
 
     final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    cg.getCodeGenerator().saveCodeForDebugging(true);
+    //    cg.getCodeGenerator().saveCodeForDebugging(true);
+
     int index = 0;
-    for (VectorWrapper<?> vw : current) {
-       ValueVector vvIn = vw.getValueVector();
-      // get the original input column names
-      SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
-      // get the renamed column names
-      SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getName());
+    for(VectorWrapper<?> vw : inputBatch) {
+      ValueVector vvIn = vw.getValueVector();
+      ValueVector vvOut = container.getValueVector(index).getValueVector();
 
       final ErrorCollector collector = new ErrorCollectorImpl();
       // According to input data names, Minortypes, Datamodes, choose to
       // transfer directly,
       // rename columns or
       // cast data types (Minortype or DataMode)
-      if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) {
+      if (container.getSchema().getColumn(index).hasSameTypeAndMode(vvIn.getField())
+          && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+          ) {
         // Transfer column
+        TransferPair tp = vvIn.makeTransferPair(vvOut);
+        transfers.add(tp);
+      } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) {
+        continue;
+      } else { // Copy data in order to rename the column
+        SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
+        MaterializedField inField = vvIn.getField();
+        MaterializedField outputField = vvOut.getField();
 
-        MajorType outputFieldType = outputFields.get(index).getType();
-        MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
-                                                                  outputFieldType);
-
-        /*
-          todo: Fix if condition when DRILL-4824 is merged
-          If condition should be changed to:
-          `if (outputFields.get(index).getName().equals(inputPath.getRootSegmentPath())) {`
-          DRILL-5419 has changed condition to correct one but this caused regression (DRILL-5521).
-          Root cause is missing indication of child column in map types when it is null.
-          DRILL-4824 is re-working json reader implementation, including map types and will fix this problem.
-          Reverting condition to previous one to avoid regression till DRILL-4824 is merged.
-          Unit test - TestJsonReader.testKvgenWithUnionAll().
-         */
-        if (outputFields.get(index).getName().equals(inputPath)) {
-          ValueVector vvOut = container.addOrGet(outputField);
-          TransferPair tp = vvIn.makeTransferPair(vvOut);
-          transfers.add(tp);
-        // Copy data in order to rename the column
-        } else {
-          final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry() );
-          if (collector.hasErrors()) {
-            throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-          }
+        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
 
-          ValueVector vv = container.addOrGet(outputField, callBack);
-          allocationVectors.add(vv);
-          TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-          ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
-          cg.addExpr(write);
-        }
-      // Cast is necessary
-      } else {
-        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry());
         if (collector.hasErrors()) {
           throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
         }
 
         // If the inputs' DataMode is required and the outputs' DataMode is not required
         // cast to the one with the least restriction
-        if (vvIn.getField().getType().getMode() == DataMode.REQUIRED
-            && outputFields.get(index).getType().getMode() != DataMode.REQUIRED) {
-          expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(), context.getFunctionRegistry(), collector);
+        if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
+            && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
+          expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
           if (collector.hasErrors()) {
             throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
           }
@@ -251,442 +211,163 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
 
         // If two inputs' MinorTypes are different,
         // Insert a cast before the Union operation
-        if (vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType()) {
-          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(), context.getFunctionRegistry(), collector);
+        if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
+          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
           if (collector.hasErrors()) {
             throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
           }
         }
 
-        final MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
-                                                                        expr.getMajorType());
-        ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
-        TypedFieldId fid = container.getValueVectorId(outputPath);
+        TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
 
-        boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         cg.addExpr(write);
+
+        allocationVectors.add(vvOut);
       }
       ++index;
     }
 
     unionall = context.getImplementationClass(cg.getCodeGenerator());
-    unionall.setup(context, current, this, transfers);
-
-    if (!schemaAvailable) {
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      schemaAvailable = true;
-    }
-
-    if (!doAlloc()) {
-      return IterOutcome.OUT_OF_MEMORY;
-    }
-
-    recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
-    setValueCount(recordCount);
-    return IterOutcome.OK;
-  }
-
-  public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
-    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
-        && (leftField.getType().getMode() == rightField.getType().getMode());
+    unionall.setup(context, inputBatch, this, transfers);
   }
 
-  // This method is used by inner class to point the reference `current` to the correct record batch
-  private void setCurrentRecordBatch(RecordBatch target) {
-    this.current = target;
-  }
 
-  // This method is used by inner class to clear the current record batch
-  private void clearCurrentRecordBatch() {
-    for (VectorWrapper<?> v: current) {
-      v.clear();
-    }
-  }
-
-  public static class UnionAllInput {
-    private UnionAllRecordBatch unionAllRecordBatch;
-    private List<MaterializedField> outputFields;
-    private OneSideInput leftSide;
-    private OneSideInput rightSide;
-    private IterOutcome upstream = IterOutcome.NOT_YET;
-    private boolean leftIsFinish = false;
-    private boolean rightIsFinish = false;
-
-    // These two schemas are obtained from the first record batches of the left and right inputs
-    // They are used to check if the schema is changed between recordbatches
-    private BatchSchema leftSchema;
-    private BatchSchema rightSchema;
-    private boolean bothEmpty = false;
-
-    public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
-      this.unionAllRecordBatch = unionAllRecordBatch;
-      leftSide = new OneSideInput(left);
-      rightSide = new OneSideInput(right);
-    }
-
-    private void setBothSideEmpty(boolean bothEmpty) {
-      this.bothEmpty = bothEmpty;
-    }
-
-    private boolean isBothSideEmpty() {
-      return bothEmpty;
-    }
-
-    public IterOutcome nextBatch() throws SchemaChangeException {
-      if (upstream == RecordBatch.IterOutcome.NOT_YET) {
-        IterOutcome iterLeft = leftSide.nextBatch();
-        switch (iterLeft) {
-          case OK_NEW_SCHEMA:
-            /*
-             * If the first few record batches are all empty,
-             * there is no way to tell whether these empty batches are coming from empty files.
-             * It is incorrect to infer output types when either side could be coming from empty.
-             *
-             * Thus, while-loop is necessary to skip those empty batches.
-             */
-            whileLoop:
-            while (leftSide.getRecordBatch().getRecordCount() == 0) {
-              iterLeft = leftSide.nextBatch();
-
-              switch(iterLeft) {
-                case STOP:
-                case OUT_OF_MEMORY:
-                  return iterLeft;
-
-                case NONE:
-                  // Special Case: The left side was an empty input.
-                  leftIsFinish = true;
-                  break whileLoop;
-
-                case NOT_YET:
-                case OK_NEW_SCHEMA:
-                case OK:
-                  continue whileLoop;
-
-                default:
-                  throw new IllegalStateException(
-                      String.format("Unexpected state %s.", iterLeft));
-              }
-            }
-
-            break;
-          case STOP:
-          case OUT_OF_MEMORY:
-            return iterLeft;
-
-          default:
-            throw new IllegalStateException(
-                String.format("Unexpected state %s.", iterLeft));
-        }
-
-        IterOutcome iterRight = rightSide.nextBatch();
-        switch (iterRight) {
-          case OK_NEW_SCHEMA:
-            // Unless there is no record batch on the left side of the inputs,
-            // always start processing from the left side.
-            if (leftIsFinish) {
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-            } else {
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-            }
-            // If the record count of the first batch from right input is zero,
-            // there are two possibilities:
-            // 1. The right side is an empty input (e.g., file).
-            // 2. There will be more records carried by later batches.
-
-            /*
-             * If the first few record batches are all empty,
-             * there is no way to tell whether these empty batches are coming from empty files.
-             * It is incorrect to infer output types when either side could be coming from empty.
-             *
-             * Thus, while-loop is necessary to skip those empty batches.
-             */
-            whileLoop:
-            while (rightSide.getRecordBatch().getRecordCount() == 0) {
-              iterRight = rightSide.nextBatch();
-              switch (iterRight) {
-                case STOP:
-                case OUT_OF_MEMORY:
-                  return iterRight;
-
-                case NONE:
-                  // Special Case: The right side was an empty input.
-                  rightIsFinish = true;
-                  break whileLoop;
-
-                case NOT_YET:
-                case OK_NEW_SCHEMA:
-                case OK:
-                  continue whileLoop;
-
-                default:
-                  throw new IllegalStateException(
-                      String.format("Unexpected state %s.", iterRight));
-              }
-            }
-
-            if (leftIsFinish && rightIsFinish) {
-              setBothSideEmpty(true);
-            }
-
-            inferOutputFields();
-            break;
-
-          case STOP:
-          case OUT_OF_MEMORY:
-            return iterRight;
-
-          default:
-            throw new IllegalStateException(
-                String.format("Unexpected state %s.", iterRight));
-        }
-
-
-
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-        return upstream;
+  // The output table's column names always follow the left table,
+  // where the output type is chosen based on DRILL's implicit casting rules
+  private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
+//    outputFields = Lists.newArrayList();
+    final Iterator<MaterializedField> leftIter = leftSchema.iterator();
+    final Iterator<MaterializedField> rightIter = rightSchema.iterator();
+
+    int index = 1;
+    while (leftIter.hasNext() && rightIter.hasNext()) {
+      MaterializedField leftField  = leftIter.next();
+      MaterializedField rightField = rightIter.next();
+
+      if (leftField.hasSameTypeAndMode(rightField)) {
+        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+        builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
+        container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
+      } else if (Types.isUntypedNull(rightField.getType())) {
+        container.addOrGet(leftField, callBack);
+      } else if (Types.isUntypedNull(leftField.getType())) {
+        container.addOrGet(MaterializedField.create(leftField.getName(), rightField.getType()), callBack);
       } else {
-        if (isBothSideEmpty()) {
-          return IterOutcome.NONE;
-        }
-
-        unionAllRecordBatch.clearCurrentRecordBatch();
-
-        if (leftIsFinish && rightIsFinish) {
-          upstream = IterOutcome.NONE;
-          return upstream;
-        } else if (leftIsFinish) {
-          IterOutcome iterOutcome = rightSide.nextBatch();
-
-          switch (iterOutcome) {
-            case NONE:
-              rightIsFinish = true;
-              // fall through
-            case STOP:
-            case OUT_OF_MEMORY:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK_NEW_SCHEMA:
-              if (!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
-                throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
-              }
-              iterOutcome = IterOutcome.OK;
-              // fall through
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-          }
-        } else if (rightIsFinish) {
-          IterOutcome iterOutcome = leftSide.nextBatch();
-          switch (iterOutcome) {
-            case STOP:
-            case OUT_OF_MEMORY:
-            case NONE:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
-          }
-        } else {
-          IterOutcome iterOutcome = leftSide.nextBatch();
-
-          switch (iterOutcome) {
-            case STOP:
-            case OUT_OF_MEMORY:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK_NEW_SCHEMA:
-              if (!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
-                throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
-              }
-
-              iterOutcome = IterOutcome.OK;
-              // fall through
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            case NONE:
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-              upstream = IterOutcome.OK;
-              leftIsFinish = true;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-          }
-        }
-      }
-    }
-
-    /**
-     *
-     * Summarize the inference in the four different situations:
-     * First of all, the field names are always determined by the left side
-     * (Even when the left side is from an empty file, we have the column names.)
-     *
-     * Cases:
-     * 1. Left: non-empty; Right: non-empty
-     *      types determined by both sides with implicit casting involved
-     * 2. Left: empty; Right: non-empty
-     *      type from the right
-     * 3. Left: non-empty; Right: empty
-     *      types from the left
-     * 4. Left: empty; Right: empty
-     *      types are nullable integer
-     */
-    private void inferOutputFields() {
-      if (!leftIsFinish && !rightIsFinish) {
-        // Both sides are non-empty
-        inferOutputFieldsBothSide();
-      } else if (!rightIsFinish) {
-        // Left side is non-empty
-        // While use left side's column names as output column names,
-        // use right side's column types as output column types.
-        inferOutputFieldsFromSingleSide(
-            leftSide.getRecordBatch().getSchema(),
-            rightSide.getRecordBatch().getSchema());
-      } else {
-        // Either right side is empty or both are empty
-        // Using left side's schema is sufficient
-        inferOutputFieldsFromSingleSide(
-            leftSide.getRecordBatch().getSchema(),
-            leftSide.getRecordBatch().getSchema());
-      }
-    }
-
-    // The output table's column names always follow the left table,
-    // where the output type is chosen based on DRILL's implicit casting rules
-    private void inferOutputFieldsBothSide() {
-      outputFields = Lists.newArrayList();
-      leftSchema = leftSide.getRecordBatch().getSchema();
-      rightSchema = rightSide.getRecordBatch().getSchema();
-      Iterator<MaterializedField> leftIter = leftSchema.iterator();
-      Iterator<MaterializedField> rightIter = rightSchema.iterator();
-
-      int index = 1;
-      while (leftIter.hasNext() && rightIter.hasNext()) {
-        MaterializedField leftField  = leftIter.next();
-        MaterializedField rightField = rightIter.next();
-
-        if (hasSameTypeAndMode(leftField, rightField)) {
-          MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+        // If the output type is not the same,
+        // cast the column of one of the table to a data type which is the Least Restrictive
+        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
+        if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
+          builder.setMinorType(leftField.getType().getMinorType());
           builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
-          outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
         } else {
-          // If the output type is not the same,
-          // cast the column of one of the table to a data type which is the Least Restrictive
-          MajorType.Builder builder = MajorType.newBuilder();
-          if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
-            builder.setMinorType(leftField.getType().getMinorType());
-            builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
-          } else {
-            List<MinorType> types = Lists.newLinkedList();
-            types.add(leftField.getType().getMinorType());
-            types.add(rightField.getType().getMinorType());
-            MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
-            if (outputMinorType == null) {
-              throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
-                  " on the left side and " + rightField.getType().getMinorType().toString() +
-                  " on the right side in column " + index + " of UNION ALL");
-            }
-            builder.setMinorType(outputMinorType);
+          List<TypeProtos.MinorType> types = Lists.newLinkedList();
+          types.add(leftField.getType().getMinorType());
+          types.add(rightField.getType().getMinorType());
+          TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
+          if (outputMinorType == null) {
+            throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
+                " on the left side and " + rightField.getType().getMinorType().toString() +
+                " on the right side in column " + index + " of UNION ALL");
           }
-
-          // The output data mode should be as flexible as the more flexible one from the two input tables
-          List<DataMode> dataModes = Lists.newLinkedList();
-          dataModes.add(leftField.getType().getMode());
-          dataModes.add(rightField.getType().getMode());
-          builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
-
-          outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
+          builder.setMinorType(outputMinorType);
         }
-        ++index;
-      }
 
-      assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
-    }
-
-    private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final BatchSchema schemaForTypes) {
-      outputFields = Lists.newArrayList();
+        // The output data mode should be as flexible as the more flexible one from the two input tables
+        List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
+        dataModes.add(leftField.getType().getMode());
+        dataModes.add(rightField.getType().getMode());
+        builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
 
-      final List<String> outputColumnNames = Lists.newArrayList();
-      for (MaterializedField materializedField : schemaForNames) {
-        outputColumnNames.add(materializedField.getName());
-      }
-
-      final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
-      for (int i = 0; iterForTypes.hasNext(); ++i) {
-        MaterializedField field = iterForTypes.next();
-        outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
+        container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
       }
+      ++index;
     }
 
-    public List<MaterializedField> getOutputFields() {
-      if (outputFields == null) {
-        throw new NullPointerException("Output fields have not been inferred");
-      }
-
-      return outputFields;
-    }
+    assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
+  }
 
-    public void killIncoming(boolean sendUpstream) {
-      leftSide.getRecordBatch().kill(sendUpstream);
-      rightSide.getRecordBatch().kill(sendUpstream);
+  private void inferOutputFieldsOneSide(final BatchSchema schema) {
+    for (MaterializedField field : schema) {
+      container.addOrGet(field, callBack);
     }
+  }
 
-    public RecordBatch getLeftRecordBatch() {
-      return leftSide.getRecordBatch();
-    }
+  private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
+    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
+        && (leftField.getType().getMode() == rightField.getType().getMode());
+  }
 
-    public RecordBatch getRightRecordBatch() {
-      return rightSide.getRecordBatch();
+  private class BatchStatusWrappper {
+    boolean prefetched;
+    final RecordBatch batch;
+    final int inputIndex;
+    final IterOutcome outcome;
+
+    BatchStatusWrappper(boolean prefetched, IterOutcome outcome, RecordBatch batch, int inputIndex) {
+      this.prefetched = prefetched;
+      this.outcome = outcome;
+      this.batch = batch;
+      this.inputIndex = inputIndex;
     }
+  }
 
-    private class OneSideInput {
-      private IterOutcome upstream = IterOutcome.NOT_YET;
-      private RecordBatch recordBatch;
+  private class UnionInputIterator implements Iterator<Pair<IterOutcome, RecordBatch>> {
+    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
-      public OneSideInput(RecordBatch recordBatch) {
-        this.recordBatch = recordBatch;
+    UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
+      if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
+        batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, right, 1));
       }
 
-      public RecordBatch getRecordBatch() {
-        return recordBatch;
+      if (leftOutCome == IterOutcome.OK_NEW_SCHEMA) {
+        batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, left, 0));
       }
+    }
 
-      public IterOutcome nextBatch() {
-        if (upstream == IterOutcome.NONE) {
-          throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-        }
+    @Override
+    public boolean hasNext() {
+      return ! batchStatusStack.isEmpty();
+    }
 
-        if (upstream == IterOutcome.NOT_YET) {
-          upstream = unionAllRecordBatch.next(recordBatch);
+    @Override
+    public Pair<IterOutcome, RecordBatch> next() {
+      while (!batchStatusStack.isEmpty()) {
+        BatchStatusWrappper topStatus = batchStatusStack.peek();
 
-          return upstream;
+        if (topStatus.prefetched) {
+          topStatus.prefetched = false;
+          return Pair.of(topStatus.outcome, topStatus.batch);
         } else {
-          do {
-            upstream = unionAllRecordBatch.next(recordBatch);
-          } while (upstream == IterOutcome.OK && recordBatch.getRecordCount() == 0);
-
-          return upstream;
+          IterOutcome outcome = UnionAllRecordBatch.this.next(topStatus.inputIndex, topStatus.batch);
+          switch (outcome) {
+          case OK:
+          case OK_NEW_SCHEMA:
+            return Pair.of(outcome, topStatus.batch);
+          case OUT_OF_MEMORY:
+          case STOP:
+            batchStatusStack.pop();
+            return Pair.of(outcome, topStatus.batch);
+          case NONE:
+            batchStatusStack.pop();
+            if (batchStatusStack.isEmpty()) {
+              return Pair.of(IterOutcome.NONE, null);
+            }
+            break;
+          default:
+            throw new IllegalStateException(String.format("Unexpected state %s", outcome));
+          }
         }
       }
+
+      throw new NoSuchElementException();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 2be1ed5..a8ee0de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -249,14 +249,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
           // OK doesn't change high-level state.
           break;
         case NONE:
-          // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if
-          // already terminated (checked above).
-          if (validationState != ValidationState.HAVE_SCHEMA) {
-            throw new IllegalStateException(
-                String.format(
-                    "next() returned %s without first returning %s [#%d, %s]",
-                    batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
-          }
+          // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
+          // FAST NONE.
           // NONE moves to terminal high-level state.
           validationState = ValidationState.TERMINAL;
           break;
@@ -306,12 +300,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
                   "Incoming batch [#%d, %s] has a null schema. This is not allowed.",
                   instNum, batchTypeName));
         }
-        if (lastSchema.getFieldCount() == 0) {
-          throw new IllegalStateException(
-              String.format(
-                  "Incoming batch [#%d, %s] has an empty schema. This is not allowed.",
-                  instNum, batchTypeName));
-        }
+        // It's legal for a batch to have zero field. For instance, a relational table could have
+        // zero columns. Querying such table requires execution operator to process batch with 0 field.
         if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
           throw new IllegalStateException(
               String.format(

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index 2298df5..a8eddbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -39,6 +39,6 @@ public class ValuesBatchCreator implements BatchCreator<Values> {
     assert children.isEmpty();
 
     JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(), null, Collections.singletonList(SchemaPath.getSimplePath("*")));
-    return new ScanBatch(config, context, Iterators.singletonIterator((RecordReader) reader));
+    return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 7e4483b..df80a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -42,8 +42,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.type.RelDataType;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import org.apache.drill.exec.util.Utilities;
 
 /**
  * GroupScan of a Drill table.
@@ -160,12 +159,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     final ScanStats stats = groupScan.getScanStats(settings);
     int columnCount = getRowType().getFieldCount();
     double ioCost = 0;
-    boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return Preconditions.checkNotNull(input).equals("*");
-      }
-    }).isPresent();
+    boolean isStarQuery = Utilities.isStarQuery(columns);
 
     if (isStarQuery) {
       columnCount = STAR_COLUMN_COST;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 25cd717..d974bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -35,18 +35,45 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 
+/**
+ * A physical Prel node for Project operator.
+ */
 public class ProjectPrel extends DrillProjectRelBase implements Prel{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectPrel.class);
 
+  private final boolean outputProj;
 
   public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
       RelDataType rowType) {
+    this(cluster, traits, child, exps, rowType, false);
+  }
+
+  /**
+   * Constructor for ProjectPrel.
+   * @param cluster
+   * @param traits traits of ProjectPrel node
+   * @param child  input
+   * @param exps   list of RexNode, representing expressions of projection.
+   * @param rowType output rowType of projection expression.
+   * @param outputProj true if ProjectPrel is inserted by {@link org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor}
+   *                   Such top Project operator does the following processing, before the result was presented to Screen/Writer
+   *                   <ol>
+   *                   <li>ensure final output field names are preserved</li>
+   *                   <li>handle cases where input does not return any batch (a fast NONE) (see ProjectRecordBatch.handleNullInput() method)</li>
+   *                   <li>handle cases where expressions in upstream operator were evaluated to NULL type </li>
+   *                   (Null type will be converted into Nullable-INT)
+   *                   </ol>
+   *                   false otherwise.
+   */
+  public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+      RelDataType rowType, boolean outputProj) {
     super(DRILL_PHYSICAL, cluster, traits, child, exps, rowType);
+    this.outputProj = outputProj;
   }
 
   @Override
   public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> exps, RelDataType rowType) {
-    return new ProjectPrel(getCluster(), traitSet, input, exps, rowType);
+    return new ProjectPrel(getCluster(), traitSet, input, exps, rowType, this.outputProj);
   }
 
 
@@ -57,7 +84,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     org.apache.drill.exec.physical.config.Project p = new org.apache.drill.exec.physical.config.Project(
-        this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))),  childPOP);
+        this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))),  childPOP, outputProj);
     return creator.addMetadata(this, p);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 587b006..08bd9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -133,9 +133,23 @@ public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeExcept
         prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
 
     RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
-    ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
-
-    return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+    ProjectPrel topProject = new ProjectPrel(prel.getCluster(),
+        prel.getTraitSet(),
+        prel,
+        projections,
+        newRowType,
+        true);  //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+
+    if (prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true)) {
+      return new ProjectPrel(prel.getCluster(),
+          prel.getTraitSet(),
+          ((Project) prel).getInput(),
+          ((Project) prel).getProjects(),
+          prel.getRowType(),
+          true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+    } else {
+      return topProject;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
new file mode 100644
index 0000000..1137922
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> extends  AbstractRecordBatch<T> {
+  protected final RecordBatch left;
+  protected final RecordBatch right;
+
+  // state (IterOutcome) of the left input
+  protected IterOutcome leftUpstream = IterOutcome.NONE;
+
+  // state (IterOutcome) of the right input
+  protected IterOutcome rightUpstream = IterOutcome.NONE;
+
+  protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left,
+      RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, true, context.newOperatorContext(popConfig));
+    this.left = left;
+    this.right = right;
+  }
+
+  protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left,
+      RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, buildSchema);
+    this.left = left;
+    this.right = right;
+  }
+
+  /**
+   * Prefetch first batch from both inputs.
+   * @return true if caller should continue processing
+   *         false if caller should stop and exit from processing.
+   */
+  protected boolean prefetchFirstBatchFromBothSides() {
+    leftUpstream = next(0, left);
+    rightUpstream = next(1, right);
+
+    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+      state = BatchState.STOP;
+      return false;
+    }
+
+    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+      state = BatchState.OUT_OF_MEMORY;
+      return false;
+    }
+
+    if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 65d164d..4a9828c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -61,6 +61,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
     }
     switch (upstream) {
     case NONE:
+      if (state == BatchState.FIRST) {
+        return handleNullInput();
+      }
+      return upstream;
     case NOT_YET:
     case STOP:
       if (state == BatchState.FIRST) {
@@ -125,4 +129,26 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
 
   protected abstract boolean setupNewSchema() throws SchemaChangeException;
   protected abstract IterOutcome doWork();
+
+  /**
+   * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
+   * This could happen when the underneath Scan operators do not produce any batch with schema.
+   *
+   * <p>
+   * Notice that NULL input is different from input with an empty batch. In the later case, input provides
+   * at least a batch, thought it's empty.
+   *</p>
+   *
+   * <p>
+   * This behavior could be override in each individual operator, if the operator's semantics is to
+   * inject a batch with schema.
+   *</p>
+   *
+   * @return IterOutcome.NONE.
+   */
+  protected IterOutcome handleNullInput() {
+    container.buildSchema(SelectionVectorMode.NONE);
+    return IterOutcome.NONE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
new file mode 100644
index 0000000..9bcea50
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+/**
+ * Wrap a VectorContainer into a record batch.
+ */
+public class SimpleRecordBatch implements RecordBatch {
+  private VectorContainer container;
+  private FragmentContext context;
+
+  public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
+    this.container = container;
+    this.context = context;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public IterOutcome next() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 2152025..3a95d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -26,19 +26,14 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 
 public abstract class AbstractRecordReader implements RecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);
 
-  private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
-  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
-
   // For text reader, the default columns to read is "columns[0]".
   protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));
 
@@ -62,7 +57,7 @@ public abstract class AbstractRecordReader implements RecordReader {
    *                  2) NULL : is NOT allowed. It requires the planner's rule, or GroupScan or ScanBatchCreator to handle NULL.
    */
   protected final void setColumns(Collection<SchemaPath> projected) {
-    Preconditions.checkNotNull(projected, COL_NULL_ERROR);
+    Preconditions.checkNotNull(projected, Utilities.COL_NULL_ERROR);
     isSkipQuery = projected.isEmpty();
     Collection<SchemaPath> columnsToRead = projected;
 
@@ -73,7 +68,7 @@ public abstract class AbstractRecordReader implements RecordReader {
       columnsToRead = getDefaultColumnsToRead();
     }
 
-    isStarQuery = isStarQuery(columnsToRead);
+    isStarQuery = Utilities.isStarQuery(columnsToRead);
     columns = transformColumns(columnsToRead);
 
     logger.debug("columns to read : {}", columns);
@@ -99,15 +94,6 @@ public abstract class AbstractRecordReader implements RecordReader {
     return isSkipQuery;
   }
 
-  public static boolean isStarQuery(Collection<SchemaPath> projected) {
-    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
-      @Override
-      public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
-      }
-    }).isPresent();
-  }
-
   @Override
   public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
     for (final ValueVector v : vectorMap.values()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index fa8121e..4b71b0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.Path;
 
 import java.util.List;
@@ -63,7 +64,7 @@ public class ColumnExplorer {
   public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
     this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     this.columns = columns;
-    this.isStarQuery = columns != null && AbstractRecordReader.isStarQuery(columns);
+    this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
     this.selectedPartitionColumns = Lists.newArrayList();
     this.tableColumns = Lists.newArrayList();
     this.allImplicitColumns = initImplicitFileColumns(optionManager);

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 1f7bce9..f81f74e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
       }
 
-    return new ScanBatch(scan, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(scan, context, oContext, readers, implicitColumns);
   }
 
   public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index d59cda2..8442c32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
   @Override
   public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index ceb1deb..c406bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -229,7 +229,11 @@ public class JSONRecordReader extends AbstractRecordReader {
            handleAndRaise("Error parsing JSON", ex);
         }
     }
-    jsonReader.ensureAtLeastOneField(writer);
+    // Skip empty json file with 0 row.
+    // Only when data source has > 0 row, ensure the batch has one field.
+    if (recordCount > 0) {
+      jsonReader.ensureAtLeastOneField(writer);
+    }
     writer.setValueCount(recordCount);
     updateRunningCount();
     return recordCount;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index 199119d..60581a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -34,6 +34,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
   public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter(), context.getOptions());
-    return new ScanBatch(config, context, Collections.singleton(rr).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(rr));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 9a7563a..8f89eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -47,6 +47,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
         readers.add(new MockRecordReader(context, e));
       }
     }
-    return new ScanBatch(config, context, readers.iterator());
+    return new ScanBatch(config, context, readers);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 4a8c5f3..5ac10e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -429,7 +429,7 @@ public class Metadata {
     List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
-    ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
+    ALL_COLS.add(Utilities.STAR_COLUMN);
     boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
     ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 78e9655..84e969a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.work.ExecErrorConstants;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
@@ -281,7 +281,7 @@ public class ParquetReaderUtility {
         // this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
         // creating a NameSegment makes sure we are using the standard code for comparing names,
         // currently it is all case-insensitive
-        if (AbstractRecordReader.isStarQuery(columns)
+        if (Utilities.isStarQuery(columns)
             || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
           int colIndex = -1;
           ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 21fc4ef..6017948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -153,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
     }
 
-    return new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
   }
 
   private static boolean isComplex(ParquetMetadata footer) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 9814b53..10187b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
@@ -225,7 +226,7 @@ public class ParquetSchema {
     for (int i = 0; i < columnsFound.length; i++) {
       SchemaPath col = projectedColumns.get(i);
       assert col != null;
-      if ( ! columnsFound[i] && ! col.equals(ParquetRecordReader.STAR_COLUMN)) {
+      if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) {
         nullFilledVectors.add(createMissingColumn(col, output));
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 2b0ef3f..ab87a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -46,6 +46,6 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
     final Iterator<Object> iterator = table.getIterator(context);
     final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator));
 
-    return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
+    return new ScanBatch(scan, context, Collections.singletonList(reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 6ee3160..35358c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -17,14 +17,23 @@
  */
 package org.apache.drill.exec.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
+import java.util.Collection;
+
 public class Utilities {
 
+  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+  public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
+
   public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
      /*
      * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
@@ -68,4 +77,18 @@ public class Utilities {
       String v = Utilities.class.getPackage().getImplementationVersion();
       return v;
   }
+
+  /**
+   * Return true if list of schema path has star column.
+   * @param projected
+   * @return
+   */
+  public static boolean isStarQuery(Collection<SchemaPath> projected) {
+    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
+      @Override
+      public boolean apply(SchemaPath path) {
+        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+      }
+    }).isPresent();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index d836bfc..4de4c2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 
@@ -178,9 +180,20 @@ public class VectorUtil {
     }
   }
 
+  public static void allocateVectors(Iterable<ValueVector> valueVectors, int count) {
+    for (final ValueVector v : valueVectors) {
+      AllocationHelper.allocateNew(v, count);
+    }
+  }
+
+  public static void setValueCount(Iterable<ValueVector> valueVectors, int count) {
+    for (final ValueVector v : valueVectors) {
+      v.getMutator().setValueCount(count);
+    }
+  }
+
   private static int getColumnWidth(int[] columnWidths, int columnIndex) {
     return (columnWidths == null) ? DEFAULT_COLUMN_WIDTH
         : (columnWidths.length > columnIndex) ? columnWidths[columnIndex] : columnWidths[0];
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 2bc78d4..990a24d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -316,7 +316,9 @@ public class DrillTestWrapper {
    */
   public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
       throws SchemaChangeException, UnsupportedEncodingException {
-    return addToCombinedVectorResults(batches, null);
+    Map<String, List<Object>> combinedVectors = new TreeMap<>();
+    addToCombinedVectorResults(batches, null, combinedVectors);
+    return combinedVectors;
   }
 
   /**
@@ -324,18 +326,20 @@ public class DrillTestWrapper {
    * @param batches
    * @param  expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
    *                       if encounter different batch schema.
-   * @return
+   * @param combinedVectors: the vectors to hold the values when iterate the batches.
+   *
+   * @return number of batches
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
+  public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
        throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
-    Map<String, List<Object>> combinedVectors = new TreeMap<>();
-
+    int numBatch = 0;
     long totalRecords = 0;
     BatchSchema schema = null;
     for (VectorAccessible loader : batches)  {
+      numBatch++;
       if (expectedSchema != null) {
         if (! expectedSchema.equals(loader.getSchema())) {
           throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
@@ -412,12 +416,12 @@ public class DrillTestWrapper {
         }
       }
     }
-    return combinedVectors;
+    return numBatch;
   }
 
   protected void compareSchemaOnly() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    List<QueryDataBatch> actual;
+    List<QueryDataBatch> actual = null;
     QueryDataBatch batch = null;
     try {
       test(testOptionSettingQueries);
@@ -448,8 +452,10 @@ public class DrillTestWrapper {
       }
 
     } finally {
-      if (batch != null) {
-        batch.release();
+      if (actual != null) {
+        for (QueryDataBatch b : actual) {
+          b.release();
+        }
       }
       loader.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 36a713f..acde8ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -39,6 +39,8 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.Text;
@@ -261,6 +263,14 @@ public class TestBuilder {
         expectedNumBatches);
   }
 
+  public SchemaTestBuilder schemaBaseLine(BatchSchema batchSchema) {
+    List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = new ArrayList<>();
+    for (final MaterializedField field : batchSchema) {
+      expectedSchema.add(Pair.of(SchemaPath.getSimplePath(field.getName()), field.getType()));
+    }
+    return schemaBaseLine(expectedSchema);
+  }
+
   public SchemaTestBuilder schemaBaseLine(List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
     assert expectedSchema != null : "The expected schema can be provided once";
     assert baselineColumns == null : "The column information should be captured in expected schema, not baselineColumns";

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 97df2ee..bbfe093 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -416,7 +416,7 @@ public class TestExampleQueries extends BaseTestQuery {
 
   @Test
   public void testCase() throws Exception {
-    test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end from cp.`tpch/nation.parquet` ;");
+    test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end, n_comment from cp.`tpch/nation.parquet` ;");
   }
 
   @Test // tests join condition that has different input types
@@ -1194,5 +1194,4 @@ public class TestExampleQueries extends BaseTestQuery {
         .build()
         .run();
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 6965ab5..63d21ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -586,7 +587,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllRightEmptyBatch() throws Exception {
+  public void testUnionAllRightEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     String queryRightEmptyBatch = String.format(
@@ -606,7 +607,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllLeftEmptyBatch() throws Exception {
+  public void testUnionAllLeftEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     final String queryLeftBatch = String.format(
@@ -627,7 +628,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllBothEmptyBatch() throws Exception {
+  public void testUnionAllBothEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
     final String query = String.format(
         "select key from dfs_test.`%s` where 1 = 0 " +
@@ -638,7 +639,7 @@ public class TestUnionAll extends BaseTestQuery {
 
     final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
     final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
-        .setMinorType(TypeProtos.MinorType.INT)
+        .setMinorType(TypeProtos.MinorType.BIT) // field "key" is boolean type
         .setMode(TypeProtos.DataMode.OPTIONAL)
         .build();
     expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));