You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/09/05 21:05:06 UTC

[1/3] drill git commit: DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

Repository: drill
Updated Branches:
  refs/heads/master e1649dd7d -> fde0a1df1


http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
index 056bc87..9d21bc9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.List;
@@ -583,7 +584,7 @@ public class TestUnionDistinct extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionDistinctRightEmptyBatch() throws Exception {
+  public void testUnionDistinctRightEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     String queryRightEmptyBatch = String.format(
@@ -603,7 +604,7 @@ public class TestUnionDistinct extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionDistinctLeftEmptyBatch() throws Exception {
+  public void testUnionDistinctLeftEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     final String queryLeftBatch = String.format(
@@ -624,7 +625,7 @@ public class TestUnionDistinct extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionDistinctBothEmptyBatch() throws Exception {
+  public void testUnionDistinctBothEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
     final String query = String.format(
         "select key from dfs_test.`%s` where 1 = 0 " +
@@ -635,7 +636,7 @@ public class TestUnionDistinct extends BaseTestQuery {
 
     final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
     final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
-        .setMinorType(TypeProtos.MinorType.INT)
+        .setMinorType(TypeProtos.MinorType.BIT) // field "key" has boolean type.
         .setMode(TypeProtos.DataMode.OPTIONAL)
         .build();
     expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
new file mode 100644
index 0000000..bb707ca
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestEmptyInputSql extends BaseTestQuery {
+
+  public final String SINGLE_EMPTY_JSON = "/scan/emptyInput/emptyJson/empty.json";
+  public final String SINGLE_EMPTY_CSVH = "/scan/emptyInput/emptyCsvH/empty.csvh";
+  public final String SINGLE_EMPTY_CSV = "/scan/emptyInput/emptyCsv/empty.csv";
+
+  /**
+   * Test with query against an empty file. Select clause has regular column reference, and an expression.
+   *
+   * regular column "key" is assigned with nullable-int
+   * expression "key + 100" is materialized with nullable-int as output type.
+   */
+  @Test
+  public void testQueryEmptyJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_JSON).toURI().toString();
+    final String query = String.format("select key, key + 100 as key2 from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("key", TypeProtos.MinorType.INT)
+        .addNullable("key2", TypeProtos.MinorType.INT)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  /**
+   * Test with query against an empty file. Select clause has one or more *
+   * star column is expanded into an empty list.
+   * @throws Exception
+   */
+  @Test
+  public void testQueryStarColEmptyJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_JSON).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+
+    final String query2 = String.format("select *, * from dfs_test.`%s` ", rootEmpty);
+
+    testBuilder()
+        .sqlQuery(query2)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  /**
+   * Test with query against an empty file. Select clause has one or more qualified *
+   * star column is expanded into an empty list.
+   * @throws Exception
+   */
+  @Test
+  public void testQueryQualifiedStarColEmptyJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_JSON).toURI().toString();
+    final String query1 = String.format("select foo.* from dfs_test.`%s` as foo", rootEmpty);
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+
+    final String query2 = String.format("select foo.*, foo.* from dfs_test.`%s` as foo", rootEmpty);
+
+    testBuilder()
+        .sqlQuery(query2)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+
+  }
+
+  @Test
+  public void testQueryMapArrayEmptyJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_JSON).toURI().toString();
+    final String query = String.format("select foo.a.b as col1, foo.columns[2] as col2, foo.bar.columns[3] as col3 from dfs_test.`%s` as foo", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("col1", TypeProtos.MinorType.INT)
+        .addNullable("col2", TypeProtos.MinorType.INT)
+        .addNullable("col3", TypeProtos.MinorType.INT)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  /**
+   * Test with query against an empty file. Select clause has three expressions.
+   * 1.0 + 100.0 as constant expression, is resolved to required FLOAT8
+   * cast(100 as varchar(100) is resolved to required varchar(100)
+   * cast(columns as varchar(100)) is resolved to nullable varchar(100).
+   */
+  @Test
+  public void testQueryConstExprEmptyJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_JSON).toURI().toString();
+    final String query = String.format("select 1.0 + 100.0 as key, "
+        + " cast(100 as varchar(100)) as name, "
+        + " cast(columns as varchar(100)) as name2 "
+        + " from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("key", TypeProtos.MinorType.FLOAT8)
+        .add("name", TypeProtos.MinorType.VARCHAR, 100)
+        .addNullable("name2", TypeProtos.MinorType.VARCHAR, 100)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  /**
+   * Test select * against empty csv with empty header. * is expanded into empty list of fields.
+   * @throws Exception
+   */
+  @Test
+  public void testQueryEmptyCsvH() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_CSVH).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  /**
+   * Test select * against empty csv file. * is exapnede into "columns : repeated-varchar",
+   * which is the default column from reading a csv file.
+   * @throws Exception
+   */
+  @Test
+  public void testQueryEmptyCsv() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(SINGLE_EMPTY_CSV).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addArray("columns", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 598bdc2..4114a04 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -150,7 +150,7 @@ public class TestPartitionSender extends PlanTestBase {
       Mockito.when(sv.get(i)).thenReturn(i);
     }
 
-    final TopNBatch.SimpleRecordBatch incoming = new TopNBatch.SimpleRecordBatch(container, sv, null);
+    final TopNBatch.SimpleSV4RecordBatch incoming = new TopNBatch.SimpleSV4RecordBatch(container, sv, null);
 
     updateTestCluster(DRILLBITS_COUNT, null);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index 2a392d7..a11889d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -57,7 +57,7 @@ public class TestSimpleUnion extends ExecTest {
     final FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
     final SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
-    final int[] counts = new int[]{100,50};
+    final int[] counts = new int[]{0, 100,50}; // first batch : 0-row schema-only batch.
     int i = 0;
     while(exec.next()) {
       System.out.println("iteration count:" + exec.getRecordCount());

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 4f0fcbf..c53536a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -47,9 +47,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.drill.DrillTestWrapper.addToCombinedVectorResults;
 import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION;
 import static org.apache.drill.exec.physical.base.AbstractBase.MAX_ALLOCATION;
 
@@ -69,8 +71,9 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
   public static class MiniPlanTestBuilder {
     protected List<Map<String, Object>> baselineRecords;
     protected RecordBatch root;
-    protected boolean expectedZeroBatch;
-    protected BatchSchema expectedSchema;
+    protected Integer expectBatchNum = null;
+    protected BatchSchema expectSchema;
+    protected boolean expectZeroRow;
 
     /**
      * Specify the root operator for a MiniPlan.
@@ -87,8 +90,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
      * @param batchSchema
      * @return
      */
-    public MiniPlanTestBuilder expectedSchema(BatchSchema batchSchema) {
-      this.expectedSchema = batchSchema;
+    public MiniPlanTestBuilder expectSchema(BatchSchema batchSchema) {
+      this.expectSchema = batchSchema;
       return this;
     }
 
@@ -104,11 +107,11 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
 
       Map<String, Object> ret = new HashMap<>();
       int i = 0;
-      Preconditions.checkArgument(expectedSchema != null , "Expected schema should be set before specify baseline values.");
-      Preconditions.checkArgument(baselineValues.length == expectedSchema.getFieldCount(),
+      Preconditions.checkArgument(expectSchema != null , "Expected schema should be set before specify baseline values.");
+      Preconditions.checkArgument(baselineValues.length == expectSchema.getFieldCount(),
           "Must supply the same number of baseline values as columns in expected schema.");
 
-      for (MaterializedField field : expectedSchema) {
+      for (MaterializedField field : expectSchema) {
         ret.put(SchemaPath.getSimplePath(field.getName()).toExpr(), baselineValues[i]);
         i++;
       }
@@ -119,11 +122,28 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
 
     /**
      * Specify one special case, where the operator tree should return 0 batch.
-     * @param expectedZeroBatch
+     * @param expectNullBatch
      * @return
      */
-    public MiniPlanTestBuilder expectZeroBatch(boolean expectedZeroBatch) {
-      this.expectedZeroBatch = expectedZeroBatch;
+    public MiniPlanTestBuilder expectNullBatch(boolean expectNullBatch) {
+      if (expectNullBatch) {
+        this.expectBatchNum = 0;
+      }
+      return this;
+    }
+
+    /**
+     * Specify the expected number of batches from operator tree.
+     * @param
+     * @return
+     */
+    public MiniPlanTestBuilder expectBatchNum(int expectBatchNum) {
+      this.expectBatchNum = expectBatchNum;
+      return this;
+    }
+
+    public MiniPlanTestBuilder expectZeroRow(boolean expectedZeroRow) {
+      this.expectZeroRow = expectedZeroRow;
       return this;
     }
 
@@ -131,16 +151,30 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       final BatchIterator batchIterator = new BatchIterator(root);
 
       // verify case of zero batch.
-      if (expectedZeroBatch) {
+      if (expectBatchNum != null && expectBatchNum == 0) {
         if (batchIterator.iterator().hasNext()) {
-          throw new AssertionError("Expected zero batches from scan. But scan return at least 1 batch!");
+          throw new AssertionError("Expected zero batches from operator tree. But operators return at least 1 batch!");
         } else {
           return; // successful
         }
       }
+      Map<String, List<Object>> actualSuperVectors = new TreeMap();
 
-      Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectedSchema);
-      Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+      int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, actualSuperVectors);
+      if (expectBatchNum != null) {
+        if (expectBatchNum != actualBatchNum) {
+          throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum));
+        }
+      }
+      Map<String, List<Object>> expectedSuperVectors;
+      if (!expectZeroRow) {
+        expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+      } else {
+        expectedSuperVectors = new TreeMap<>();
+        for (MaterializedField field : expectSchema) {
+          expectedSuperVectors.put(SchemaPath.getSimplePath(field.getName()).toExpr(), new ArrayList<>());
+        }
+      }
       DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
     }
   }
@@ -221,7 +255,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
     }
 
     /**
-     * Set initial memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#INIT_ALLOCATION}
+     * Set initial memory reservation used by this operator's allocator. Default is {@link org.apache.drill.exec.physical.base.AbstractBase#INIT_ALLOCATION}
      * @param initReservation
      * @return
      */
@@ -231,7 +265,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
     }
 
     /**
-     * Set max memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#MAX_ALLOCATION}
+     * Set max memory reservation used by this operator's allocator. Default is {@link org.apache.drill.exec.physical.base.AbstractBase#MAX_ALLOCATION}
      * @param maxAllocation
      * @return
      */
@@ -366,7 +400,12 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
         readers = TestUtilities.getJsonReadersFromInputFiles(fs, inputPaths, fragContext, columnsToRead);
       }
 
-      RecordBatch scanBatch = new ScanBatch(null, fragContext, readers);
+      List<RecordReader> readerList = new ArrayList<>();
+      while(readers.hasNext()) {
+        readerList.add(readers.next());
+      }
+
+      RecordBatch scanBatch = new ScanBatch(null, fragContext, readerList);
       return scanBatch;
     }
   }
@@ -420,7 +459,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
         }
       }
 
-      RecordBatch scanBatch = new ScanBatch(null, fragContext, readers.iterator());
+      RecordBatch scanBatch = new ScanBatch(null, fragContext, readers);
       return scanBatch;
     }
   } // end of ParquetScanBuilder

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 7d09ca5..157f1d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -207,7 +207,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
         if (inputStreamsJSON != null) {
           for (List<String> batchesJson : inputStreamsJSON) {
             incomingStreams.add(new ScanBatch(null, fragContext,
-                getRecordReadersForJsonBatches(batchesJson, fragContext)));
+                getReaderListForJsonBatches(batchesJson, fragContext)));
           }
         }
 
@@ -351,5 +351,13 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     return TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*")));
   }
 
+  private List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
+    Iterator<RecordReader> readers = getRecordReadersForJsonBatches(jsonBatches, fragContext);
+    List<RecordReader> readerList = new ArrayList<>();
+    while(readers.hasNext()) {
+      readerList.add(readers.next());
+    }
+    return readerList;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
index d0a64f4..1a52a06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
@@ -30,7 +30,6 @@ import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -54,22 +53,6 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
   }
 
   @Test
-  @Ignore("DRILL-5464: A bug in JsonRecordReader handling empty file")
-  public void testEmptyJsonInput() throws Exception {
-    String emptyFile = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
-
-    RecordBatch scanBatch = new JsonScanBuilder()
-        .fileSystem(fs)
-        .inputPaths(Lists.newArrayList(emptyFile))
-        .build();
-
-    new MiniPlanTestBuilder()
-        .root(scanBatch)
-        .expectZeroBatch(true)
-        .go();
-  }
-
-  @Test
   public void testSimpleParquetScan() throws Exception {
     String file = FileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
 
@@ -85,7 +68,7 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
 
     new MiniPlanTestBuilder()
         .root(scanBatch)
-        .expectedSchema(expectedSchema)
+        .expectSchema(expectedSchema)
         .baselineValues(0L)
         .baselineValues(1L)
         .go();
@@ -107,7 +90,7 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
 
     new MiniPlanTestBuilder()
         .root(scanBatch)
-        .expectedSchema(expectedSchema)
+        .expectSchema(expectedSchema)
         .baselineValues(100L)
         .go();
   }
@@ -149,58 +132,12 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
 
     new MiniPlanTestBuilder()
         .root(batch)
-        .expectedSchema(expectedSchema)
+        .expectSchema(expectedSchema)
         .baselineValues(5l, 1l)
         .baselineValues(5l, 5l)
         .baselineValues(50l, 100l)
         .go();
   }
 
-  @Test
-  @Ignore ("DRILL-5327: A bug in UnionAll handling empty inputs from both sides")
-  public void testUnionFilterAll() throws Exception {
-    List<String> leftJsonBatches = Lists.newArrayList(
-        "[{\"a\": 5, \"b\" : 1 }]");
-
-    List<String> rightJsonBatches = Lists.newArrayList(
-        "[{\"a\": 50, \"b\" : 10 }]");
-
-    RecordBatch leftScan = new JsonScanBuilder()
-        .jsonBatches(leftJsonBatches)
-        .columnsToRead("a", "b")
-        .build();
-
-    RecordBatch leftFilter = new PopBuilder()
-        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
-        .addInput(leftScan)
-        .build();
-
-    RecordBatch rightScan = new JsonScanBuilder()
-        .jsonBatches(rightJsonBatches)
-        .columnsToRead("a", "b")
-        .build();
-
-    RecordBatch rightFilter = new PopBuilder()
-        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
-        .addInput(rightScan)
-        .build();
-
-    RecordBatch batch = new PopBuilder()
-        .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
-        .addInput(leftFilter)
-        .addInput(rightFilter)
-        .build();
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .addNullable("a", TypeProtos.MinorType.BIGINT)
-        .addNullable("b", TypeProtos.MinorType.BIGINT)
-        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
-        .build();
-
-    new MiniPlanTestBuilder()
-        .root(batch)
-        .expectedSchema(expectedSchema)
-        .go();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
new file mode 100644
index 0000000..1127314
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{
+  protected static DrillFileSystem fs;
+
+  public final String SINGLE_EMPTY_JSON = "/scan/emptyInput/emptyJson/empty.json";
+  public final String SINGLE_EMPTY_JSON2 = "/scan/emptyInput/emptyJson/empty2.json";
+  public final String SINGLE_JSON = "/scan/jsonTbl/1990/1.json";  // {id: 100, name : "John"}
+  public final String SINGLE_JSON2 = "/scan/jsonTbl/1991/2.json"; // {id: 1000, name : "Joe"}
+
+  @BeforeClass
+  public static void initFS() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+    fs = new DrillFileSystem(conf);
+  }
+
+  /**
+   * Test ScanBatch with a single empty json file.
+   * @throws Exception
+   */
+  @Test
+  public void testEmptyJsonInput() throws Exception {
+    RecordBatch scanBatch = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectNullBatch(true)
+        .go();
+  }
+
+  /**
+   * Test ScanBatch with mixed json files.
+   * input is empty, data_file, empty, data_file
+   * */
+  @Test
+  public void testJsonInputMixedWithEmptyFiles1() throws Exception {
+    RecordBatch scanBatch = createScanBatchFromJson(SINGLE_EMPTY_JSON, SINGLE_JSON, SINGLE_EMPTY_JSON2, SINGLE_JSON2);
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("id", TypeProtos.MinorType.BIGINT)
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(100L, "John")
+        .baselineValues(1000L, "Joe")
+        .expectBatchNum(2)
+        .go();
+
+  }
+
+  /**
+   * Test ScanBatch with mixed json files.
+   * input is empty, empty, data_file, data_file
+   * */
+  @Test
+  public void testJsonInputMixedWithEmptyFiles2() throws Exception {
+    RecordBatch scanBatch = createScanBatchFromJson(SINGLE_EMPTY_JSON, SINGLE_EMPTY_JSON2, SINGLE_JSON, SINGLE_JSON2);
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("id", TypeProtos.MinorType.BIGINT)
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(100L, "John")
+        .baselineValues(1000L, "Joe")
+        .expectBatchNum(2)
+        .go();
+  }
+
+  /**
+   * Test ScanBatch with mixed json files.
+   * input is empty, data_file, data_file, empty
+   * */
+  @Test
+  public void testJsonInputMixedWithEmptyFiles3() throws Exception {
+    RecordBatch scanBatch = createScanBatchFromJson(SINGLE_EMPTY_JSON, SINGLE_JSON, SINGLE_JSON2, SINGLE_EMPTY_JSON2);
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("id", TypeProtos.MinorType.BIGINT)
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(100L, "John")
+        .baselineValues(1000L, "Joe")
+        .expectBatchNum(2)
+        .go();
+  }
+
+  /**
+   * Test ScanBatch with mixed json files.
+   * input is data_file, data_file, empty, empty
+   * */
+  @Test
+  public void testJsonInputMixedWithEmptyFiles4() throws Exception {
+    RecordBatch scanBatch = createScanBatchFromJson(SINGLE_JSON, SINGLE_JSON2, SINGLE_EMPTY_JSON2, SINGLE_EMPTY_JSON2);
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("id", TypeProtos.MinorType.BIGINT)
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(scanBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(100L, "John")
+        .baselineValues(1000L, "Joe")
+        .expectBatchNum(2)
+        .go();
+  }
+
+  @Test
+  public void testProjectEmpty() throws Exception {
+    final PhysicalOperator project = new Project(parseExprs("x+5", "x"), null);
+    testSingleInputNullBatchHandling(project);
+  }
+
+  @Test
+  public void testFilterEmpty() throws Exception {
+    final PhysicalOperator filter = new Filter(null, parseExpr("a=5"), 1.0f);
+    testSingleInputNullBatchHandling(filter);
+  }
+
+  @Test
+  public void testHashAggEmpty() throws Exception {
+    final PhysicalOperator hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    testSingleInputNullBatchHandling(hashAgg);
+  }
+
+  @Test
+  public void testStreamingAggEmpty() throws Exception {
+    final PhysicalOperator hashAgg = new StreamingAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    testSingleInputNullBatchHandling(hashAgg);
+  }
+
+  @Test
+  public void testSortEmpty() throws Exception {
+    final PhysicalOperator sort = new ExternalSort(null,
+        Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
+    testSingleInputNullBatchHandling(sort);
+  }
+
+  @Test
+  public void testLimitEmpty() throws Exception {
+    final PhysicalOperator limit = new Limit(null, 10, 5);
+    testSingleInputNullBatchHandling(limit);
+  }
+
+  @Test
+  public void testFlattenEmpty() throws Exception {
+    final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("col1"));
+    testSingleInputNullBatchHandling(flatten);
+  }
+
+  @Test
+  public void testUnionEmptyBoth() throws Exception {
+    final PhysicalOperator unionAll = new UnionAll(Collections.EMPTY_LIST); // Children list is provided through RecordBatch
+    testTwoInputNullBatchHandling(unionAll);
+  }
+
+  @Test
+  public void testHashJoinEmptyBoth() throws Exception {
+   final PhysicalOperator join = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.INNER);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testLeftHashJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.LEFT);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testRightHashJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.RIGHT);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testFullHashJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.FULL);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testMergeJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.INNER);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testLeftMergeJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.LEFT);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testRightMergeJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.RIGHT);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  @Ignore("Full Merge join is not supported.")
+  public void testFullMergeJoinEmptyBoth() throws Exception {
+    final PhysicalOperator join = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "b")), JoinRelType.FULL);
+    testTwoInputNullBatchHandling(join);
+  }
+
+  @Test
+  public void testUnionLeftEmtpy() throws Exception {
+    final PhysicalOperator unionAll = new UnionAll(Collections.EMPTY_LIST); // Children list is provided through RecordBatch
+
+    RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    String file = FileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
+
+    RecordBatch scanBatch = new ParquetScanBuilder()
+        .fileSystem(fs)
+        .columnsToRead("R_REGIONKEY")
+        .inputPaths(Lists.newArrayList(file))
+        .build();
+
+    RecordBatch projectBatch = new PopBuilder()
+        .physicalOperator(new Project(parseExprs("R_REGIONKEY+10", "regionkey"), null))
+        .addInput(scanBatch)
+        .build();
+
+    RecordBatch unionBatch = new PopBuilder()
+        .physicalOperator(unionAll)
+        .addInput(left)
+        .addInput(projectBatch)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("regionkey", TypeProtos.MinorType.BIGINT)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(unionBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(10L)
+        .baselineValues(11L)
+        .go();
+  }
+
+
+  @Test
+  public void testHashJoinLeftEmpty() throws Exception {
+    RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : 10 }]");
+
+    RecordBatch rightScan = new JsonScanBuilder()
+        .jsonBatches(rightJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch joinBatch = new PopBuilder()
+        .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.INNER))
+        .addInput(left)
+        .addInput(rightScan)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(joinBatch)
+        .expectSchema(expectedSchema)
+        .expectZeroRow(true)
+        .go();
+  }
+
+  @Test
+  public void testHashJoinRightEmpty() throws Exception {
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : 10 }]");
+
+    RecordBatch leftScan = new JsonScanBuilder()
+        .jsonBatches(leftJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch right = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    RecordBatch joinBatch = new PopBuilder()
+        .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.INNER))
+        .addInput(leftScan)
+        .addInput(right)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(joinBatch)
+        .expectSchema(expectedSchema)
+        .expectZeroRow(true)
+        .go();
+  }
+
+
+  @Test
+  public void testLeftHashJoinLeftEmpty() throws Exception {
+    RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : 10 }]");
+
+    RecordBatch rightScan = new JsonScanBuilder()
+        .jsonBatches(rightJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch joinBatch = new PopBuilder()
+        .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.LEFT))
+        .addInput(left)
+        .addInput(rightScan)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(joinBatch)
+        .expectSchema(expectedSchema)
+        .expectZeroRow(true)
+        .go();
+  }
+
+  @Test
+  public void testLeftHashJoinRightEmpty() throws Exception {
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : 10 }]");
+
+    RecordBatch leftScan = new JsonScanBuilder()
+        .jsonBatches(leftJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch right = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    RecordBatch joinBatch = new PopBuilder()
+        .physicalOperator(new HashJoinPOP(null, null, Lists.newArrayList(joinCond("a", "EQUALS", "a2")), JoinRelType.LEFT))
+        .addInput(leftScan)
+        .addInput(right)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.BIGINT)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(joinBatch)
+        .expectSchema(expectedSchema)
+        .baselineValues(50L, 10L)
+        .go();
+  }
+
+  @Test
+  public void testUnionFilterAll() throws Exception {
+    List<String> leftJsonBatches = Lists.newArrayList(
+        "[{\"a\": 5, \"b\" : \"name1\" }]");
+
+    List<String> rightJsonBatches = Lists.newArrayList(
+        "[{\"a\": 50, \"b\" : \"name2\" }]");
+
+    RecordBatch leftScan = new JsonScanBuilder()
+        .jsonBatches(leftJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch leftFilter = new PopBuilder()
+        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+        .addInput(leftScan)
+        .build();
+
+    RecordBatch rightScan = new JsonScanBuilder()
+        .jsonBatches(rightJsonBatches)
+        .columnsToRead("a", "b")
+        .build();
+
+    RecordBatch rightFilter = new PopBuilder()
+        .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f))
+        .addInput(rightScan)
+        .build();
+
+    RecordBatch batch = new PopBuilder()
+        .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch
+        .addInput(leftFilter)
+        .addInput(rightFilter)
+        .build();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("a", TypeProtos.MinorType.BIGINT)
+        .addNullable("b", TypeProtos.MinorType.VARCHAR)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch)
+        .expectSchema(expectedSchema)
+        .expectZeroRow(true)
+        .go();
+  }
+
+  @Test
+  public void testOutputProjectEmpty() throws Exception {
+    final PhysicalOperator project = new Project(
+        parseExprs(
+        "x", "col1",
+        "x + 100", "col2",
+        "100.0", "col3",
+        "cast(nonExist as varchar(100))", "col4"), null, true);
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("col1", TypeProtos.MinorType.INT)
+        .addNullable("col2", TypeProtos.MinorType.INT)
+        .add("col3", TypeProtos.MinorType.FLOAT8)
+        .addNullable("col4", TypeProtos.MinorType.VARCHAR, 100)
+        .withSVMode(BatchSchema.SelectionVectorMode.NONE)
+        .build();
+
+    final RecordBatch input = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    RecordBatch batch = new PopBuilder()
+        .physicalOperator(project) // Children list is provided through RecordBatch
+        .addInput(input)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch)
+        .expectSchema(expectedSchema)
+        .expectZeroRow(true)
+        .go();
+  }
+
+  /**
+   * Given a physical, first construct scan batch from one single empty json, then construct scan batch from
+   * multiple empty json files. In both case, verify that the output is a NullBatch.
+   * @param pop
+   * @throws Exception
+   */
+  private void testSingleInputNullBatchHandling(PhysicalOperator pop) throws Exception {
+    final RecordBatch input = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    RecordBatch batch = new PopBuilder()
+        .physicalOperator(pop)
+        .addInput(input)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch)
+        .expectNullBatch(true)
+        .go();
+
+    final RecordBatch input2 = createScanBatchFromJson(SINGLE_EMPTY_JSON, SINGLE_EMPTY_JSON2);;
+    RecordBatch batch2 = new PopBuilder()
+        .physicalOperator(pop)
+        .addInput(input2)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(batch2)
+        .expectNullBatch(true)
+        .go();
+  }
+
+  private void testTwoInputNullBatchHandling(PhysicalOperator pop) throws Exception {
+    RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+    RecordBatch right = createScanBatchFromJson(SINGLE_EMPTY_JSON);
+
+    RecordBatch joinBatch = new PopBuilder()
+        .physicalOperator(pop)
+        .addInput(left)
+        .addInput(right)
+        .build();
+
+    new MiniPlanTestBuilder()
+        .root(joinBatch)
+        .expectNullBatch(true)
+        .go();
+  }
+
+  private RecordBatch createScanBatchFromJson(String... resourcePaths) throws Exception {
+    List<String> inputPaths = new ArrayList<>();
+
+    for (String resource : resourcePaths) {
+      inputPaths.add(FileUtils.getResourceAsFile(resource).toURI().toString());
+    }
+
+    RecordBatch scanBatch = new JsonScanBuilder()
+        .fileSystem(fs)
+        .inputPaths(inputPaths)
+        .build();
+
+    return scanBatch;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
index 3974448..d1a16df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
@@ -20,9 +20,13 @@ package org.apache.drill.exec.store;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Rule;
@@ -36,6 +40,9 @@ public class TestImplicitFileColumns extends BaseTestQuery {
   public static final String MAIN = "main";
   public static final String NESTED = "nested";
   public static final String CSV = "csv";
+  public final String JSON_TBL = "/scan/jsonTbl"; // 1990/1.json : {id:100, name: "John"}, 1991/2.json : {id: 1000, name : "Joe"}
+  public final String PARQUET_TBL = "/multilevel/parquet/";  // 1990/Q1/orders_1990_q1.parquet, ...
+  public final String CSV_TBL = "/multilevel/csv";  // 1990/Q1/orders_1990_q1.csv, ..
 
   private static final JsonStringArrayList<Text> mainColumnValues = new JsonStringArrayList<Text>() {{
     add(new Text(MAIN));
@@ -147,4 +154,66 @@ public class TestImplicitFileColumns extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testStarColumnJson() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(JSON_TBL).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
+        .addNullable("id", TypeProtos.MinorType.BIGINT)
+        .addNullable("name", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testStarColumnParquet() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(PARQUET_TBL).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
+        .addNullable("dir1", TypeProtos.MinorType.VARCHAR)
+        .add("o_orderkey", TypeProtos.MinorType.INT)
+        .add("o_custkey", TypeProtos.MinorType.INT)
+        .add("o_orderstatus", TypeProtos.MinorType.VARCHAR)
+        .add("o_totalprice", TypeProtos.MinorType.FLOAT8)
+        .add("o_orderdate", TypeProtos.MinorType.DATE)
+        .add("o_orderpriority", TypeProtos.MinorType.VARCHAR)
+        .add("o_clerk", TypeProtos.MinorType.VARCHAR)
+        .add("o_shippriority", TypeProtos.MinorType.INT)
+        .add("o_comment", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testStarColumnCsv() throws Exception {
+    final String rootEmpty = FileUtils.getResourceAsFile(CSV_TBL).toURI().toString();
+    final String query1 = String.format("select * from dfs_test.`%s` ", rootEmpty);
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
+        .addNullable("dir1", TypeProtos.MinorType.VARCHAR)
+        .addArray("columns", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query1)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsv/empty.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsv/empty.csv b/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsv/empty.csv
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsvH/empty.csvh
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsvH/empty.csvh b/exec/java-exec/src/test/resources/scan/emptyInput/emptyCsvH/empty.csvh
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty.json b/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty2.json b/exec/java-exec/src/test/resources/scan/emptyInput/emptyJson/empty2.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/jsonTbl/1990/1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/jsonTbl/1990/1.json b/exec/java-exec/src/test/resources/scan/jsonTbl/1990/1.json
new file mode 100644
index 0000000..e9f1e9a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/scan/jsonTbl/1990/1.json
@@ -0,0 +1,2 @@
+{id: 100, name : "John"}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/resources/scan/jsonTbl/1991/2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/scan/jsonTbl/1991/2.json b/exec/java-exec/src/test/resources/scan/jsonTbl/1991/2.json
new file mode 100644
index 0000000..7496839
--- /dev/null
+++ b/exec/java-exec/src/test/resources/scan/jsonTbl/1991/2.json
@@ -0,0 +1 @@
+{id: 1000, name : "Joe"}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
index 016199a..c589ed7 100644
--- a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
+++ b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
@@ -16,6 +16,9 @@
  * limitations under the License.
  */
 
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
+
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="/org/apache/drill/exec/expr/BasicTypeHelper.java" />
 
@@ -108,6 +111,8 @@ public class BasicTypeHelper {
 </#list>
     case GENERIC_OBJECT      :
       return ObjectVector.class  ;
+    case NULL:
+      return UntypedNullVector.class;
     default:
       break;
     }
@@ -271,6 +276,8 @@ public class BasicTypeHelper {
 </#list>
     case GENERIC_OBJECT:
       return new ObjectVector(field, allocator)        ;
+    case NULL:
+      return new UntypedNullVector(field, allocator);
     default:
       break;
     }
@@ -348,6 +355,8 @@ public class BasicTypeHelper {
     case GENERIC_OBJECT:
       ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
       return;
+    case NULL:
+      ((UntypedNullVector) vector).getMutator().setSafe(index, (UntypedNullHolder) holder);
     default:
       throw new UnsupportedOperationException(buildErrorMessage("set value", type));
     }
@@ -376,7 +385,9 @@ public class BasicTypeHelper {
       </#list>
       case GENERIC_OBJECT:
         ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
-      default:
+    case NULL:
+      ((UntypedNullVector) vector).getMutator().setSafe(index, (UntypedNullHolder) holder);
+    default:
         throw new UnsupportedOperationException(buildErrorMessage("set value safe", type));
     }
   }
@@ -428,6 +439,8 @@ public class BasicTypeHelper {
       </#list>
       case GENERIC_OBJECT:
         return new ObjectHolder();
+    case NULL:
+        return new UntypedNullHolder();
       default:
         throw new UnsupportedOperationException(buildErrorMessage("create value holder", type));
     }
@@ -451,6 +464,8 @@ public class BasicTypeHelper {
       }
       </#list>
       </#list>
+    case NULL:
+      return true;
       default:
         throw new UnsupportedOperationException(buildErrorMessage("check is null", type));
     }
@@ -532,7 +547,9 @@ public class BasicTypeHelper {
       }
     </#list>
     </#list>
-
+    else if (holder instanceof UntypedNullHolder) {
+      return UntypedNullHolder.TYPE;
+    }
     throw new UnsupportedOperationException("ValueHolder is not supported for 'getValueHolderType' method.");
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/vector/src/main/codegen/templates/ValueHolders.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/ValueHolders.java b/exec/vector/src/main/codegen/templates/ValueHolders.java
index 11607b4..f134049 100644
--- a/exec/vector/src/main/codegen/templates/ValueHolders.java
+++ b/exec/vector/src/main/codegen/templates/ValueHolders.java
@@ -35,9 +35,7 @@ package org.apache.drill.exec.expr.holders;
 public final class ${className} implements ValueHolder{
   
   public static final MajorType TYPE = Types.${mode.name?lower_case}(MinorType.${minor.class?upper_case});
-  
-  public MajorType getType() {return TYPE;}
-  
+
     <#if mode.name == "Repeated">
     
     /** The first index (inclusive) into the Vector. **/
@@ -93,7 +91,9 @@ public final class ${className} implements ValueHolder{
       return ((buffer.getInt(start) & 0x80000000) != 0);
     }
     </#if></#if>
-    
+
+    public MajorType getType() {return TYPE;}
+
     @Deprecated
     public int hashCode(){
       throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 1ecedc6..e2b44a7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -207,6 +207,16 @@ public class MaterializedField {
     return builder.toString();
 }
 
+  /**
+   * Return true if two fields have identical MinorType and Mode.
+   * @param that
+   * @return
+   */
+  public boolean hasSameTypeAndMode(MaterializedField that) {
+    return (getType().getMinorType() == that.getType().getMinorType())
+        && (getType().getMode() == that.getType().getMode());
+  }
+
   private String toString(Collection<?> collection, int maxLen) {
     StringBuilder builder = new StringBuilder();
     builder.append(" [");

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java
new file mode 100644
index 0000000..a205eda
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+
+public class UntypedNullHolder implements ValueHolder {
+  public static final TypeProtos.MajorType TYPE = Types.optional(TypeProtos.MinorType.NULL);
+  public static final int WIDTH = 0;
+  public int isSet = 1;
+
+  public TypeProtos.MajorType getType() {return TYPE;}
+
+  @Deprecated
+  public int hashCode(){
+    throw new UnsupportedOperationException();
+  }
+
+  /*
+   * Reason for deprecation is that ValueHolders are potential scalar replacements
+   * and hence we don't want any methods to be invoked on them.
+   */
+  @Deprecated
+  public String toString(){
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
new file mode 100644
index 0000000..8288fe2
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.vector;
+
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import static org.apache.calcite.sql.parser.impl.SqlParserImplConstants.C;
+
+/** UntypedNullVector is to represent a value vector with {@link org.apache.drill.common.types.MinorType#NULL}
+ *  All values in the vector represent two semantic implications: 1) the value is unknown, 2) the type is unknown.
+ *  Because of this, we only have to keep track of the number of values in value vector,
+ *  and there is no allocated buffer to back up this value vector. Therefore, the majority of
+ *  methods in this class is either no-op, or throws {@link UnsupportedOperationException}.
+ *
+ */
+public final class UntypedNullVector extends BaseDataValueVector implements FixedWidthVector {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UntypedNullVector.class);
+
+  /**
+   * Width of each fixed-width value.
+   */
+  public static final int VALUE_WIDTH = 0;
+
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private int valueCount;
+
+  public UntypedNullVector(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+    valueCount = 0;
+  }
+
+  @Override
+  public FieldReader getReader() { throw new UnsupportedOperationException(); }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+    return 0;
+  }
+
+  @Override
+  public int getValueCapacity(){
+    return ValueVector.MAX_ROW_COUNT;
+  }
+
+  @Override
+  public Accessor getAccessor() { return accessor; }
+
+  @Override
+  public Mutator getMutator() { return mutator; }
+
+  @Override
+  public void setInitialCapacity(final int valueCount) {
+  }
+
+  @Override
+  public void allocateNew() {
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    return true;
+  }
+
+  @Override
+  public void allocateNew(final int valueCount) {
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void zeroVector() {
+  }
+
+  @Override
+  public void load(SerializedField metadata, DrillBuf buffer) {
+    Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()),
+        "The field %s doesn't match the provided metadata %s.", this.field, metadata);
+    final int actualLength = metadata.getBufferLength();
+    final int valueCount = metadata.getValueCount();
+    final int expectedLength = valueCount * VALUE_WIDTH;
+    assert actualLength == expectedLength : String.format("Expected to load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
+
+    this.valueCount = valueCount;
+  }
+
+  @Override
+  public TransferPair getTransferPair(BufferAllocator allocator){
+    return new TransferImpl(getField(), allocator);
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator){
+    return new TransferImpl(getField().withPath(ref), allocator);
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    return new TransferImpl((UntypedNullVector) to);
+  }
+
+  public void transferTo(UntypedNullVector target){
+  }
+
+  public void splitAndTransferTo(int startIndex, int length, UntypedNullVector target) {
+  }
+
+  @Override
+  public int getPayloadByteCount(int valueCount) {
+    return 0;
+  }
+
+  private class TransferImpl implements TransferPair{
+    private UntypedNullVector to;
+
+    public TransferImpl(MaterializedField field, BufferAllocator allocator){
+      to = new UntypedNullVector(field, allocator);
+    }
+
+    public TransferImpl(UntypedNullVector to) {
+      this.to = to;
+    }
+
+    @Override
+    public UntypedNullVector getTo(){
+      return to;
+    }
+
+    @Override
+    public void transfer(){
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      checkBounds(startIndex);
+      checkBounds(startIndex + length - 1);
+      splitAndTransferTo(startIndex, length, to);
+    }
+
+    @Override
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      checkBounds(fromIndex);
+      to.copyFromSafe(fromIndex, toIndex, UntypedNullVector.this);
+    }
+  }
+
+  public void copyFrom(int fromIndex, int thisIndex, UntypedNullVector from){
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, UntypedNullVector from){
+  }
+
+  private void checkBounds(int index) {
+    if (index < 0 || index >= valueCount) {
+      throw new IndexOutOfBoundsException(String.format(
+          "index: %d, expected: range(0, %d-1))", index, valueCount));
+    }
+  }
+  @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    ((UntypedNullVector) from).data.getBytes(fromIndex * 4, data, toIndex * 4, 4);
+  }
+
+  public final class Accessor extends BaseAccessor {
+    @Override
+    public int getValueCount() {
+      return valueCount;
+    }
+
+    @Override
+    public boolean isNull(int index){
+      checkBounds(index);
+      return true;
+    }
+
+    public int isSet(int index) {
+      checkBounds(index);
+      return 0;
+    }
+
+    @Override
+    public Object getObject(int index) {
+      checkBounds(index);
+      return null;
+    }
+
+    public void get(int index, UntypedNullHolder holder) {
+      checkBounds(index);
+    }
+
+  }
+
+  /**
+   * UntypedNullVector.Mutator throws Exception for most of its mutate operations, except for the ones that set
+   * value counts.
+   *
+   */
+   public final class Mutator extends BaseMutator {
+
+    private Mutator() {}
+
+    public void set(int index, UntypedNullHolder holder) {
+      throw new UnsupportedOperationException("UntypedNullVector does not support set");
+    }
+
+    public void set(int index, int isSet, UntypedNullHolder holder) {
+      throw new UnsupportedOperationException("UntypedNullVector does not support set");
+    }
+
+    public void setSafe(int index, UntypedNullHolder holder) {
+      throw new UnsupportedOperationException("UntypedNullVector does not support setSafe");
+    }
+
+    public void setSafe(int index, int isSet, UntypedNullHolder holder) {
+      throw new UnsupportedOperationException("UntypedNullVector does not support setSafe");
+    }
+
+    public void setScalar(int index, UntypedNullHolder holder) throws VectorOverflowException {
+      throw new UnsupportedOperationException("UntypedNullVector does not support setScalar");
+    }
+
+    public void setArrayItem(int index, UntypedNullHolder holder) throws VectorOverflowException {
+      throw new UnsupportedOperationException("UntypedNullVector does not support setArrayItem");
+    }
+
+    @Override
+    public void generateTestData(int size) {
+      setValueCount(size);
+    }
+
+    public void generateTestDataAlt(int size) {
+      setValueCount(size);
+    }
+
+    @Override
+    public void setValueCount(int valueCount) {
+      UntypedNullVector.this.valueCount = valueCount;
+    }
+  }
+
+}
\ No newline at end of file


[2/3] drill git commit: DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

Posted by jn...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 5afe66b..4d623cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,18 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.union;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -39,88 +36,96 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.resolver.TypeCastRules;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
 
-  private List<MaterializedField> outputFields;
+  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private UnionAllInput unionAllInput;
-  private RecordBatch current;
-
   private final List<TransferPair> transfers = Lists.newArrayList();
-  private List<ValueVector> allocationVectors;
-  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private List<ValueVector> allocationVectors = Lists.newArrayList();
   private int recordCount = 0;
-  private boolean schemaAvailable = false;
+  private UnionInputIterator unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
-    super(config, context, false);
-    assert (children.size() == 2) : "The number of the operands of Union must be 2";
-    unionAllInput = new UnionAllInput(this, children.get(0), children.get(1));
-  }
-
-  @Override
-  public int getRecordCount() {
-    return recordCount;
+    super(config, context, true, children.get(0), children.get(1));
   }
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
-    unionAllInput.getLeftRecordBatch().kill(sendUpstream);
-    unionAllInput.getRightRecordBatch().kill(sendUpstream);
+    left.kill(sendUpstream);
+    right.kill(sendUpstream);
   }
 
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
-  }
+  protected void buildSchema() throws SchemaChangeException {
+    if (! prefetchFirstBatchFromBothSides()) {
+      return;
+    }
 
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
+    unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right);
+
+    if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsOneSide(right.getSchema());
+    } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsOneSide((left.getSchema()));
+    } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    VectorAccessibleUtilities.allocateVectors(container, 0);
+    VectorAccessibleUtilities.setValueCount(container,0);
   }
 
   @Override
   public IterOutcome innerNext() {
     try {
-      IterOutcome upstream = unionAllInput.nextBatch();
-      logger.debug("Upstream of Union-All: {}", upstream);
-      switch (upstream) {
+      while (true) {
+        if (!unionInputIterator.hasNext()) {
+          return IterOutcome.NONE;
+        }
+
+        Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+        IterOutcome upstream = nextBatch.left;
+        RecordBatch incoming = nextBatch.right;
+
+        switch (upstream) {
         case NONE:
         case OUT_OF_MEMORY:
         case STOP:
           return upstream;
-
         case OK_NEW_SCHEMA:
-          outputFields = unionAllInput.getOutputFields();
+          return doWork(nextBatch.right, true);
         case OK:
-          IterOutcome workOutcome = doWork();
-
-          if (workOutcome != IterOutcome.OK) {
-            return workOutcome;
-          } else {
-            return upstream;
+          // skip batches with same schema as the previous one yet having 0 row.
+          if (incoming.getRecordCount() == 0) {
+            VectorAccessibleUtilities.clear(incoming);
+            continue;
           }
+          return doWork(nextBatch.right, false);
         default:
           throw new IllegalStateException(String.format("Unknown state %s.", upstream));
+        }
       }
     } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
       context.fail(ex);
@@ -130,120 +135,75 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
+  public int getRecordCount() {
+    return recordCount;
   }
 
-  private void setValueCount(int count) {
-    for (ValueVector v : allocationVectors) {
-      ValueVector.Mutator m = v.getMutator();
-      m.setValueCount(count);
-    }
-  }
-
-  private boolean doAlloc() {
-    for (ValueVector v : allocationVectors) {
-      try {
-        AllocationHelper.allocateNew(v, current.getRecordCount());
-      } catch (OutOfMemoryException ex) {
-        return false;
-      }
-    }
-    return true;
-  }
 
   @SuppressWarnings("resource")
-  private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException {
-    if (allocationVectors != null) {
-      for (ValueVector v : allocationVectors) {
-        v.clear();
-      }
+  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException {
+    Preconditions.checkArgument(inputBatch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
+        "Input batch and output batch have different field counthas!");
+
+    if (newSchema) {
+      createUnionAller(inputBatch);
     }
 
-    allocationVectors = Lists.newArrayList();
-    transfers.clear();
+    container.zeroVectors();
+    VectorUtil.allocateVectors(allocationVectors, inputBatch.getRecordCount());
+    recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+    VectorUtil.setValueCount(allocationVectors, recordCount);
 
-    // If both sides of Union-All are empty
-    if (unionAllInput.isBothSideEmpty()) {
-      for (MaterializedField materializedField : outputFields) {
-        final String colName = materializedField.getName();
-        final MajorType majorType = MajorType.newBuilder()
-            .setMinorType(MinorType.INT)
-            .setMode(DataMode.OPTIONAL)
-            .build();
-
-        MaterializedField outputField = MaterializedField.create(colName, majorType);
-        ValueVector vv = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vv);
-      }
-
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      return IterOutcome.OK;
     }
+  }
+
+  private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException {
+    transfers.clear();
+    allocationVectors.clear();
 
     final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    cg.getCodeGenerator().saveCodeForDebugging(true);
+    //    cg.getCodeGenerator().saveCodeForDebugging(true);
+
     int index = 0;
-    for (VectorWrapper<?> vw : current) {
-       ValueVector vvIn = vw.getValueVector();
-      // get the original input column names
-      SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
-      // get the renamed column names
-      SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getName());
+    for(VectorWrapper<?> vw : inputBatch) {
+      ValueVector vvIn = vw.getValueVector();
+      ValueVector vvOut = container.getValueVector(index).getValueVector();
 
       final ErrorCollector collector = new ErrorCollectorImpl();
       // According to input data names, Minortypes, Datamodes, choose to
       // transfer directly,
       // rename columns or
       // cast data types (Minortype or DataMode)
-      if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) {
+      if (container.getSchema().getColumn(index).hasSameTypeAndMode(vvIn.getField())
+          && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+          ) {
         // Transfer column
+        TransferPair tp = vvIn.makeTransferPair(vvOut);
+        transfers.add(tp);
+      } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) {
+        continue;
+      } else { // Copy data in order to rename the column
+        SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
+        MaterializedField inField = vvIn.getField();
+        MaterializedField outputField = vvOut.getField();
 
-        MajorType outputFieldType = outputFields.get(index).getType();
-        MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
-                                                                  outputFieldType);
-
-        /*
-          todo: Fix if condition when DRILL-4824 is merged
-          If condition should be changed to:
-          `if (outputFields.get(index).getName().equals(inputPath.getRootSegmentPath())) {`
-          DRILL-5419 has changed condition to correct one but this caused regression (DRILL-5521).
-          Root cause is missing indication of child column in map types when it is null.
-          DRILL-4824 is re-working json reader implementation, including map types and will fix this problem.
-          Reverting condition to previous one to avoid regression till DRILL-4824 is merged.
-          Unit test - TestJsonReader.testKvgenWithUnionAll().
-         */
-        if (outputFields.get(index).getName().equals(inputPath)) {
-          ValueVector vvOut = container.addOrGet(outputField);
-          TransferPair tp = vvIn.makeTransferPair(vvOut);
-          transfers.add(tp);
-        // Copy data in order to rename the column
-        } else {
-          final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry() );
-          if (collector.hasErrors()) {
-            throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-          }
+        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
 
-          ValueVector vv = container.addOrGet(outputField, callBack);
-          allocationVectors.add(vv);
-          TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-          ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
-          cg.addExpr(write);
-        }
-      // Cast is necessary
-      } else {
-        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry());
         if (collector.hasErrors()) {
           throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
         }
 
         // If the inputs' DataMode is required and the outputs' DataMode is not required
         // cast to the one with the least restriction
-        if (vvIn.getField().getType().getMode() == DataMode.REQUIRED
-            && outputFields.get(index).getType().getMode() != DataMode.REQUIRED) {
-          expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(), context.getFunctionRegistry(), collector);
+        if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
+            && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
+          expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
           if (collector.hasErrors()) {
             throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
           }
@@ -251,442 +211,163 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
 
         // If two inputs' MinorTypes are different,
         // Insert a cast before the Union operation
-        if (vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType()) {
-          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(), context.getFunctionRegistry(), collector);
+        if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
+          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
           if (collector.hasErrors()) {
             throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
           }
         }
 
-        final MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
-                                                                        expr.getMajorType());
-        ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
-        TypedFieldId fid = container.getValueVectorId(outputPath);
+        TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
 
-        boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         cg.addExpr(write);
+
+        allocationVectors.add(vvOut);
       }
       ++index;
     }
 
     unionall = context.getImplementationClass(cg.getCodeGenerator());
-    unionall.setup(context, current, this, transfers);
-
-    if (!schemaAvailable) {
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      schemaAvailable = true;
-    }
-
-    if (!doAlloc()) {
-      return IterOutcome.OUT_OF_MEMORY;
-    }
-
-    recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
-    setValueCount(recordCount);
-    return IterOutcome.OK;
-  }
-
-  public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
-    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
-        && (leftField.getType().getMode() == rightField.getType().getMode());
+    unionall.setup(context, inputBatch, this, transfers);
   }
 
-  // This method is used by inner class to point the reference `current` to the correct record batch
-  private void setCurrentRecordBatch(RecordBatch target) {
-    this.current = target;
-  }
 
-  // This method is used by inner class to clear the current record batch
-  private void clearCurrentRecordBatch() {
-    for (VectorWrapper<?> v: current) {
-      v.clear();
-    }
-  }
-
-  public static class UnionAllInput {
-    private UnionAllRecordBatch unionAllRecordBatch;
-    private List<MaterializedField> outputFields;
-    private OneSideInput leftSide;
-    private OneSideInput rightSide;
-    private IterOutcome upstream = IterOutcome.NOT_YET;
-    private boolean leftIsFinish = false;
-    private boolean rightIsFinish = false;
-
-    // These two schemas are obtained from the first record batches of the left and right inputs
-    // They are used to check if the schema is changed between recordbatches
-    private BatchSchema leftSchema;
-    private BatchSchema rightSchema;
-    private boolean bothEmpty = false;
-
-    public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
-      this.unionAllRecordBatch = unionAllRecordBatch;
-      leftSide = new OneSideInput(left);
-      rightSide = new OneSideInput(right);
-    }
-
-    private void setBothSideEmpty(boolean bothEmpty) {
-      this.bothEmpty = bothEmpty;
-    }
-
-    private boolean isBothSideEmpty() {
-      return bothEmpty;
-    }
-
-    public IterOutcome nextBatch() throws SchemaChangeException {
-      if (upstream == RecordBatch.IterOutcome.NOT_YET) {
-        IterOutcome iterLeft = leftSide.nextBatch();
-        switch (iterLeft) {
-          case OK_NEW_SCHEMA:
-            /*
-             * If the first few record batches are all empty,
-             * there is no way to tell whether these empty batches are coming from empty files.
-             * It is incorrect to infer output types when either side could be coming from empty.
-             *
-             * Thus, while-loop is necessary to skip those empty batches.
-             */
-            whileLoop:
-            while (leftSide.getRecordBatch().getRecordCount() == 0) {
-              iterLeft = leftSide.nextBatch();
-
-              switch(iterLeft) {
-                case STOP:
-                case OUT_OF_MEMORY:
-                  return iterLeft;
-
-                case NONE:
-                  // Special Case: The left side was an empty input.
-                  leftIsFinish = true;
-                  break whileLoop;
-
-                case NOT_YET:
-                case OK_NEW_SCHEMA:
-                case OK:
-                  continue whileLoop;
-
-                default:
-                  throw new IllegalStateException(
-                      String.format("Unexpected state %s.", iterLeft));
-              }
-            }
-
-            break;
-          case STOP:
-          case OUT_OF_MEMORY:
-            return iterLeft;
-
-          default:
-            throw new IllegalStateException(
-                String.format("Unexpected state %s.", iterLeft));
-        }
-
-        IterOutcome iterRight = rightSide.nextBatch();
-        switch (iterRight) {
-          case OK_NEW_SCHEMA:
-            // Unless there is no record batch on the left side of the inputs,
-            // always start processing from the left side.
-            if (leftIsFinish) {
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-            } else {
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-            }
-            // If the record count of the first batch from right input is zero,
-            // there are two possibilities:
-            // 1. The right side is an empty input (e.g., file).
-            // 2. There will be more records carried by later batches.
-
-            /*
-             * If the first few record batches are all empty,
-             * there is no way to tell whether these empty batches are coming from empty files.
-             * It is incorrect to infer output types when either side could be coming from empty.
-             *
-             * Thus, while-loop is necessary to skip those empty batches.
-             */
-            whileLoop:
-            while (rightSide.getRecordBatch().getRecordCount() == 0) {
-              iterRight = rightSide.nextBatch();
-              switch (iterRight) {
-                case STOP:
-                case OUT_OF_MEMORY:
-                  return iterRight;
-
-                case NONE:
-                  // Special Case: The right side was an empty input.
-                  rightIsFinish = true;
-                  break whileLoop;
-
-                case NOT_YET:
-                case OK_NEW_SCHEMA:
-                case OK:
-                  continue whileLoop;
-
-                default:
-                  throw new IllegalStateException(
-                      String.format("Unexpected state %s.", iterRight));
-              }
-            }
-
-            if (leftIsFinish && rightIsFinish) {
-              setBothSideEmpty(true);
-            }
-
-            inferOutputFields();
-            break;
-
-          case STOP:
-          case OUT_OF_MEMORY:
-            return iterRight;
-
-          default:
-            throw new IllegalStateException(
-                String.format("Unexpected state %s.", iterRight));
-        }
-
-
-
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-        return upstream;
+  // The output table's column names always follow the left table,
+  // where the output type is chosen based on DRILL's implicit casting rules
+  private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
+//    outputFields = Lists.newArrayList();
+    final Iterator<MaterializedField> leftIter = leftSchema.iterator();
+    final Iterator<MaterializedField> rightIter = rightSchema.iterator();
+
+    int index = 1;
+    while (leftIter.hasNext() && rightIter.hasNext()) {
+      MaterializedField leftField  = leftIter.next();
+      MaterializedField rightField = rightIter.next();
+
+      if (leftField.hasSameTypeAndMode(rightField)) {
+        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+        builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
+        container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
+      } else if (Types.isUntypedNull(rightField.getType())) {
+        container.addOrGet(leftField, callBack);
+      } else if (Types.isUntypedNull(leftField.getType())) {
+        container.addOrGet(MaterializedField.create(leftField.getName(), rightField.getType()), callBack);
       } else {
-        if (isBothSideEmpty()) {
-          return IterOutcome.NONE;
-        }
-
-        unionAllRecordBatch.clearCurrentRecordBatch();
-
-        if (leftIsFinish && rightIsFinish) {
-          upstream = IterOutcome.NONE;
-          return upstream;
-        } else if (leftIsFinish) {
-          IterOutcome iterOutcome = rightSide.nextBatch();
-
-          switch (iterOutcome) {
-            case NONE:
-              rightIsFinish = true;
-              // fall through
-            case STOP:
-            case OUT_OF_MEMORY:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK_NEW_SCHEMA:
-              if (!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
-                throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
-              }
-              iterOutcome = IterOutcome.OK;
-              // fall through
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-          }
-        } else if (rightIsFinish) {
-          IterOutcome iterOutcome = leftSide.nextBatch();
-          switch (iterOutcome) {
-            case STOP:
-            case OUT_OF_MEMORY:
-            case NONE:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
-          }
-        } else {
-          IterOutcome iterOutcome = leftSide.nextBatch();
-
-          switch (iterOutcome) {
-            case STOP:
-            case OUT_OF_MEMORY:
-              upstream = iterOutcome;
-              return upstream;
-
-            case OK_NEW_SCHEMA:
-              if (!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
-                throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
-              }
-
-              iterOutcome = IterOutcome.OK;
-              // fall through
-            case OK:
-              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-              upstream = iterOutcome;
-              return upstream;
-
-            case NONE:
-              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
-              upstream = IterOutcome.OK;
-              leftIsFinish = true;
-              return upstream;
-
-            default:
-              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-          }
-        }
-      }
-    }
-
-    /**
-     *
-     * Summarize the inference in the four different situations:
-     * First of all, the field names are always determined by the left side
-     * (Even when the left side is from an empty file, we have the column names.)
-     *
-     * Cases:
-     * 1. Left: non-empty; Right: non-empty
-     *      types determined by both sides with implicit casting involved
-     * 2. Left: empty; Right: non-empty
-     *      type from the right
-     * 3. Left: non-empty; Right: empty
-     *      types from the left
-     * 4. Left: empty; Right: empty
-     *      types are nullable integer
-     */
-    private void inferOutputFields() {
-      if (!leftIsFinish && !rightIsFinish) {
-        // Both sides are non-empty
-        inferOutputFieldsBothSide();
-      } else if (!rightIsFinish) {
-        // Left side is non-empty
-        // While use left side's column names as output column names,
-        // use right side's column types as output column types.
-        inferOutputFieldsFromSingleSide(
-            leftSide.getRecordBatch().getSchema(),
-            rightSide.getRecordBatch().getSchema());
-      } else {
-        // Either right side is empty or both are empty
-        // Using left side's schema is sufficient
-        inferOutputFieldsFromSingleSide(
-            leftSide.getRecordBatch().getSchema(),
-            leftSide.getRecordBatch().getSchema());
-      }
-    }
-
-    // The output table's column names always follow the left table,
-    // where the output type is chosen based on DRILL's implicit casting rules
-    private void inferOutputFieldsBothSide() {
-      outputFields = Lists.newArrayList();
-      leftSchema = leftSide.getRecordBatch().getSchema();
-      rightSchema = rightSide.getRecordBatch().getSchema();
-      Iterator<MaterializedField> leftIter = leftSchema.iterator();
-      Iterator<MaterializedField> rightIter = rightSchema.iterator();
-
-      int index = 1;
-      while (leftIter.hasNext() && rightIter.hasNext()) {
-        MaterializedField leftField  = leftIter.next();
-        MaterializedField rightField = rightIter.next();
-
-        if (hasSameTypeAndMode(leftField, rightField)) {
-          MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+        // If the output type is not the same,
+        // cast the column of one of the table to a data type which is the Least Restrictive
+        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
+        if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
+          builder.setMinorType(leftField.getType().getMinorType());
           builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
-          outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
         } else {
-          // If the output type is not the same,
-          // cast the column of one of the table to a data type which is the Least Restrictive
-          MajorType.Builder builder = MajorType.newBuilder();
-          if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
-            builder.setMinorType(leftField.getType().getMinorType());
-            builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
-          } else {
-            List<MinorType> types = Lists.newLinkedList();
-            types.add(leftField.getType().getMinorType());
-            types.add(rightField.getType().getMinorType());
-            MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
-            if (outputMinorType == null) {
-              throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
-                  " on the left side and " + rightField.getType().getMinorType().toString() +
-                  " on the right side in column " + index + " of UNION ALL");
-            }
-            builder.setMinorType(outputMinorType);
+          List<TypeProtos.MinorType> types = Lists.newLinkedList();
+          types.add(leftField.getType().getMinorType());
+          types.add(rightField.getType().getMinorType());
+          TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
+          if (outputMinorType == null) {
+            throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
+                " on the left side and " + rightField.getType().getMinorType().toString() +
+                " on the right side in column " + index + " of UNION ALL");
           }
-
-          // The output data mode should be as flexible as the more flexible one from the two input tables
-          List<DataMode> dataModes = Lists.newLinkedList();
-          dataModes.add(leftField.getType().getMode());
-          dataModes.add(rightField.getType().getMode());
-          builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
-
-          outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
+          builder.setMinorType(outputMinorType);
         }
-        ++index;
-      }
 
-      assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
-    }
-
-    private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final BatchSchema schemaForTypes) {
-      outputFields = Lists.newArrayList();
+        // The output data mode should be as flexible as the more flexible one from the two input tables
+        List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
+        dataModes.add(leftField.getType().getMode());
+        dataModes.add(rightField.getType().getMode());
+        builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
 
-      final List<String> outputColumnNames = Lists.newArrayList();
-      for (MaterializedField materializedField : schemaForNames) {
-        outputColumnNames.add(materializedField.getName());
-      }
-
-      final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
-      for (int i = 0; iterForTypes.hasNext(); ++i) {
-        MaterializedField field = iterForTypes.next();
-        outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
+        container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
       }
+      ++index;
     }
 
-    public List<MaterializedField> getOutputFields() {
-      if (outputFields == null) {
-        throw new NullPointerException("Output fields have not been inferred");
-      }
-
-      return outputFields;
-    }
+    assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
+  }
 
-    public void killIncoming(boolean sendUpstream) {
-      leftSide.getRecordBatch().kill(sendUpstream);
-      rightSide.getRecordBatch().kill(sendUpstream);
+  private void inferOutputFieldsOneSide(final BatchSchema schema) {
+    for (MaterializedField field : schema) {
+      container.addOrGet(field, callBack);
     }
+  }
 
-    public RecordBatch getLeftRecordBatch() {
-      return leftSide.getRecordBatch();
-    }
+  private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
+    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
+        && (leftField.getType().getMode() == rightField.getType().getMode());
+  }
 
-    public RecordBatch getRightRecordBatch() {
-      return rightSide.getRecordBatch();
+  private class BatchStatusWrappper {
+    boolean prefetched;
+    final RecordBatch batch;
+    final int inputIndex;
+    final IterOutcome outcome;
+
+    BatchStatusWrappper(boolean prefetched, IterOutcome outcome, RecordBatch batch, int inputIndex) {
+      this.prefetched = prefetched;
+      this.outcome = outcome;
+      this.batch = batch;
+      this.inputIndex = inputIndex;
     }
+  }
 
-    private class OneSideInput {
-      private IterOutcome upstream = IterOutcome.NOT_YET;
-      private RecordBatch recordBatch;
+  private class UnionInputIterator implements Iterator<Pair<IterOutcome, RecordBatch>> {
+    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
-      public OneSideInput(RecordBatch recordBatch) {
-        this.recordBatch = recordBatch;
+    UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
+      if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
+        batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, right, 1));
       }
 
-      public RecordBatch getRecordBatch() {
-        return recordBatch;
+      if (leftOutCome == IterOutcome.OK_NEW_SCHEMA) {
+        batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, left, 0));
       }
+    }
 
-      public IterOutcome nextBatch() {
-        if (upstream == IterOutcome.NONE) {
-          throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-        }
+    @Override
+    public boolean hasNext() {
+      return ! batchStatusStack.isEmpty();
+    }
 
-        if (upstream == IterOutcome.NOT_YET) {
-          upstream = unionAllRecordBatch.next(recordBatch);
+    @Override
+    public Pair<IterOutcome, RecordBatch> next() {
+      while (!batchStatusStack.isEmpty()) {
+        BatchStatusWrappper topStatus = batchStatusStack.peek();
 
-          return upstream;
+        if (topStatus.prefetched) {
+          topStatus.prefetched = false;
+          return Pair.of(topStatus.outcome, topStatus.batch);
         } else {
-          do {
-            upstream = unionAllRecordBatch.next(recordBatch);
-          } while (upstream == IterOutcome.OK && recordBatch.getRecordCount() == 0);
-
-          return upstream;
+          IterOutcome outcome = UnionAllRecordBatch.this.next(topStatus.inputIndex, topStatus.batch);
+          switch (outcome) {
+          case OK:
+          case OK_NEW_SCHEMA:
+            return Pair.of(outcome, topStatus.batch);
+          case OUT_OF_MEMORY:
+          case STOP:
+            batchStatusStack.pop();
+            return Pair.of(outcome, topStatus.batch);
+          case NONE:
+            batchStatusStack.pop();
+            if (batchStatusStack.isEmpty()) {
+              return Pair.of(IterOutcome.NONE, null);
+            }
+            break;
+          default:
+            throw new IllegalStateException(String.format("Unexpected state %s", outcome));
+          }
         }
       }
+
+      throw new NoSuchElementException();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 2be1ed5..a8ee0de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -249,14 +249,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
           // OK doesn't change high-level state.
           break;
         case NONE:
-          // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if
-          // already terminated (checked above).
-          if (validationState != ValidationState.HAVE_SCHEMA) {
-            throw new IllegalStateException(
-                String.format(
-                    "next() returned %s without first returning %s [#%d, %s]",
-                    batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
-          }
+          // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
+          // FAST NONE.
           // NONE moves to terminal high-level state.
           validationState = ValidationState.TERMINAL;
           break;
@@ -306,12 +300,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
                   "Incoming batch [#%d, %s] has a null schema. This is not allowed.",
                   instNum, batchTypeName));
         }
-        if (lastSchema.getFieldCount() == 0) {
-          throw new IllegalStateException(
-              String.format(
-                  "Incoming batch [#%d, %s] has an empty schema. This is not allowed.",
-                  instNum, batchTypeName));
-        }
+        // It's legal for a batch to have zero field. For instance, a relational table could have
+        // zero columns. Querying such table requires execution operator to process batch with 0 field.
         if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
           throw new IllegalStateException(
               String.format(

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index 2298df5..a8eddbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -39,6 +39,6 @@ public class ValuesBatchCreator implements BatchCreator<Values> {
     assert children.isEmpty();
 
     JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(), null, Collections.singletonList(SchemaPath.getSimplePath("*")));
-    return new ScanBatch(config, context, Iterators.singletonIterator((RecordReader) reader));
+    return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 7e4483b..df80a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -42,8 +42,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.type.RelDataType;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import org.apache.drill.exec.util.Utilities;
 
 /**
  * GroupScan of a Drill table.
@@ -160,12 +159,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     final ScanStats stats = groupScan.getScanStats(settings);
     int columnCount = getRowType().getFieldCount();
     double ioCost = 0;
-    boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return Preconditions.checkNotNull(input).equals("*");
-      }
-    }).isPresent();
+    boolean isStarQuery = Utilities.isStarQuery(columns);
 
     if (isStarQuery) {
       columnCount = STAR_COLUMN_COST;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 25cd717..d974bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -35,18 +35,45 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 
+/**
+ * A physical Prel node for Project operator.
+ */
 public class ProjectPrel extends DrillProjectRelBase implements Prel{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectPrel.class);
 
+  private final boolean outputProj;
 
   public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
       RelDataType rowType) {
+    this(cluster, traits, child, exps, rowType, false);
+  }
+
+  /**
+   * Constructor for ProjectPrel.
+   * @param cluster
+   * @param traits traits of ProjectPrel node
+   * @param child  input
+   * @param exps   list of RexNode, representing expressions of projection.
+   * @param rowType output rowType of projection expression.
+   * @param outputProj true if ProjectPrel is inserted by {@link org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor}
+   *                   Such top Project operator does the following processing, before the result was presented to Screen/Writer
+   *                   <ol>
+   *                   <li>ensure final output field names are preserved</li>
+   *                   <li>handle cases where input does not return any batch (a fast NONE) (see ProjectRecordBatch.handleNullInput() method)</li>
+   *                   <li>handle cases where expressions in upstream operator were evaluated to NULL type </li>
+   *                   (Null type will be converted into Nullable-INT)
+   *                   </ol>
+   *                   false otherwise.
+   */
+  public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+      RelDataType rowType, boolean outputProj) {
     super(DRILL_PHYSICAL, cluster, traits, child, exps, rowType);
+    this.outputProj = outputProj;
   }
 
   @Override
   public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> exps, RelDataType rowType) {
-    return new ProjectPrel(getCluster(), traitSet, input, exps, rowType);
+    return new ProjectPrel(getCluster(), traitSet, input, exps, rowType, this.outputProj);
   }
 
 
@@ -57,7 +84,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     org.apache.drill.exec.physical.config.Project p = new org.apache.drill.exec.physical.config.Project(
-        this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))),  childPOP);
+        this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))),  childPOP, outputProj);
     return creator.addMetadata(this, p);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 587b006..08bd9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -133,9 +133,23 @@ public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeExcept
         prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
 
     RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
-    ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
-
-    return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+    ProjectPrel topProject = new ProjectPrel(prel.getCluster(),
+        prel.getTraitSet(),
+        prel,
+        projections,
+        newRowType,
+        true);  //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+
+    if (prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true)) {
+      return new ProjectPrel(prel.getCluster(),
+          prel.getTraitSet(),
+          ((Project) prel).getInput(),
+          ((Project) prel).getProjects(),
+          prel.getRowType(),
+          true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+    } else {
+      return topProject;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
new file mode 100644
index 0000000..1137922
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> extends  AbstractRecordBatch<T> {
+  protected final RecordBatch left;
+  protected final RecordBatch right;
+
+  // state (IterOutcome) of the left input
+  protected IterOutcome leftUpstream = IterOutcome.NONE;
+
+  // state (IterOutcome) of the right input
+  protected IterOutcome rightUpstream = IterOutcome.NONE;
+
+  protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left,
+      RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, true, context.newOperatorContext(popConfig));
+    this.left = left;
+    this.right = right;
+  }
+
+  protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left,
+      RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, buildSchema);
+    this.left = left;
+    this.right = right;
+  }
+
+  /**
+   * Prefetch first batch from both inputs.
+   * @return true if caller should continue processing
+   *         false if caller should stop and exit from processing.
+   */
+  protected boolean prefetchFirstBatchFromBothSides() {
+    leftUpstream = next(0, left);
+    rightUpstream = next(1, right);
+
+    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+      state = BatchState.STOP;
+      return false;
+    }
+
+    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+      state = BatchState.OUT_OF_MEMORY;
+      return false;
+    }
+
+    if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 65d164d..4a9828c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -61,6 +61,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
     }
     switch (upstream) {
     case NONE:
+      if (state == BatchState.FIRST) {
+        return handleNullInput();
+      }
+      return upstream;
     case NOT_YET:
     case STOP:
       if (state == BatchState.FIRST) {
@@ -125,4 +129,26 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
 
   protected abstract boolean setupNewSchema() throws SchemaChangeException;
   protected abstract IterOutcome doWork();
+
+  /**
+   * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
+   * This could happen when the underneath Scan operators do not produce any batch with schema.
+   *
+   * <p>
+   * Notice that NULL input is different from input with an empty batch. In the later case, input provides
+   * at least a batch, thought it's empty.
+   *</p>
+   *
+   * <p>
+   * This behavior could be override in each individual operator, if the operator's semantics is to
+   * inject a batch with schema.
+   *</p>
+   *
+   * @return IterOutcome.NONE.
+   */
+  protected IterOutcome handleNullInput() {
+    container.buildSchema(SelectionVectorMode.NONE);
+    return IterOutcome.NONE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
new file mode 100644
index 0000000..9bcea50
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+/**
+ * Wrap a VectorContainer into a record batch.
+ */
+public class SimpleRecordBatch implements RecordBatch {
+  private VectorContainer container;
+  private FragmentContext context;
+
+  public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
+    this.container = container;
+    this.context = context;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public IterOutcome next() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 2152025..3a95d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -26,19 +26,14 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 
 public abstract class AbstractRecordReader implements RecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);
 
-  private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
-  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
-
   // For text reader, the default columns to read is "columns[0]".
   protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));
 
@@ -62,7 +57,7 @@ public abstract class AbstractRecordReader implements RecordReader {
    *                  2) NULL : is NOT allowed. It requires the planner's rule, or GroupScan or ScanBatchCreator to handle NULL.
    */
   protected final void setColumns(Collection<SchemaPath> projected) {
-    Preconditions.checkNotNull(projected, COL_NULL_ERROR);
+    Preconditions.checkNotNull(projected, Utilities.COL_NULL_ERROR);
     isSkipQuery = projected.isEmpty();
     Collection<SchemaPath> columnsToRead = projected;
 
@@ -73,7 +68,7 @@ public abstract class AbstractRecordReader implements RecordReader {
       columnsToRead = getDefaultColumnsToRead();
     }
 
-    isStarQuery = isStarQuery(columnsToRead);
+    isStarQuery = Utilities.isStarQuery(columnsToRead);
     columns = transformColumns(columnsToRead);
 
     logger.debug("columns to read : {}", columns);
@@ -99,15 +94,6 @@ public abstract class AbstractRecordReader implements RecordReader {
     return isSkipQuery;
   }
 
-  public static boolean isStarQuery(Collection<SchemaPath> projected) {
-    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
-      @Override
-      public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
-      }
-    }).isPresent();
-  }
-
   @Override
   public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
     for (final ValueVector v : vectorMap.values()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index fa8121e..4b71b0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.Path;
 
 import java.util.List;
@@ -63,7 +64,7 @@ public class ColumnExplorer {
   public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
     this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     this.columns = columns;
-    this.isStarQuery = columns != null && AbstractRecordReader.isStarQuery(columns);
+    this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
     this.selectedPartitionColumns = Lists.newArrayList();
     this.tableColumns = Lists.newArrayList();
     this.allImplicitColumns = initImplicitFileColumns(optionManager);

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 1f7bce9..f81f74e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
       }
 
-    return new ScanBatch(scan, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(scan, context, oContext, readers, implicitColumns);
   }
 
   public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index d59cda2..8442c32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
   @Override
   public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index ceb1deb..c406bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -229,7 +229,11 @@ public class JSONRecordReader extends AbstractRecordReader {
            handleAndRaise("Error parsing JSON", ex);
         }
     }
-    jsonReader.ensureAtLeastOneField(writer);
+    // Skip empty json file with 0 row.
+    // Only when data source has > 0 row, ensure the batch has one field.
+    if (recordCount > 0) {
+      jsonReader.ensureAtLeastOneField(writer);
+    }
     writer.setValueCount(recordCount);
     updateRunningCount();
     return recordCount;

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index 199119d..60581a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -34,6 +34,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
   public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter(), context.getOptions());
-    return new ScanBatch(config, context, Collections.singleton(rr).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(rr));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 9a7563a..8f89eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -47,6 +47,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
         readers.add(new MockRecordReader(context, e));
       }
     }
-    return new ScanBatch(config, context, readers.iterator());
+    return new ScanBatch(config, context, readers);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 4a8c5f3..5ac10e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -429,7 +429,7 @@ public class Metadata {
     List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
-    ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
+    ALL_COLS.add(Utilities.STAR_COLUMN);
     boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
     ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 78e9655..84e969a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.work.ExecErrorConstants;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
@@ -281,7 +281,7 @@ public class ParquetReaderUtility {
         // this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
         // creating a NameSegment makes sure we are using the standard code for comparing names,
         // currently it is all case-insensitive
-        if (AbstractRecordReader.isStarQuery(columns)
+        if (Utilities.isStarQuery(columns)
             || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
           int colIndex = -1;
           ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 21fc4ef..6017948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -153,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
     }
 
-    return new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
   }
 
   private static boolean isComplex(ParquetMetadata footer) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 9814b53..10187b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
@@ -225,7 +226,7 @@ public class ParquetSchema {
     for (int i = 0; i < columnsFound.length; i++) {
       SchemaPath col = projectedColumns.get(i);
       assert col != null;
-      if ( ! columnsFound[i] && ! col.equals(ParquetRecordReader.STAR_COLUMN)) {
+      if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) {
         nullFilledVectors.add(createMissingColumn(col, output));
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 2b0ef3f..ab87a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -46,6 +46,6 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
     final Iterator<Object> iterator = table.getIterator(context);
     final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator));
 
-    return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
+    return new ScanBatch(scan, context, Collections.singletonList(reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 6ee3160..35358c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -17,14 +17,23 @@
  */
 package org.apache.drill.exec.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
+import java.util.Collection;
+
 public class Utilities {
 
+  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+  public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
+
   public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
      /*
      * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
@@ -68,4 +77,18 @@ public class Utilities {
       String v = Utilities.class.getPackage().getImplementationVersion();
       return v;
   }
+
+  /**
+   * Return true if list of schema path has star column.
+   * @param projected
+   * @return
+   */
+  public static boolean isStarQuery(Collection<SchemaPath> projected) {
+    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
+      @Override
+      public boolean apply(SchemaPath path) {
+        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+      }
+    }).isPresent();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index d836bfc..4de4c2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 
@@ -178,9 +180,20 @@ public class VectorUtil {
     }
   }
 
+  public static void allocateVectors(Iterable<ValueVector> valueVectors, int count) {
+    for (final ValueVector v : valueVectors) {
+      AllocationHelper.allocateNew(v, count);
+    }
+  }
+
+  public static void setValueCount(Iterable<ValueVector> valueVectors, int count) {
+    for (final ValueVector v : valueVectors) {
+      v.getMutator().setValueCount(count);
+    }
+  }
+
   private static int getColumnWidth(int[] columnWidths, int columnIndex) {
     return (columnWidths == null) ? DEFAULT_COLUMN_WIDTH
         : (columnWidths.length > columnIndex) ? columnWidths[columnIndex] : columnWidths[0];
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 2bc78d4..990a24d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -316,7 +316,9 @@ public class DrillTestWrapper {
    */
   public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
       throws SchemaChangeException, UnsupportedEncodingException {
-    return addToCombinedVectorResults(batches, null);
+    Map<String, List<Object>> combinedVectors = new TreeMap<>();
+    addToCombinedVectorResults(batches, null, combinedVectors);
+    return combinedVectors;
   }
 
   /**
@@ -324,18 +326,20 @@ public class DrillTestWrapper {
    * @param batches
    * @param  expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
    *                       if encounter different batch schema.
-   * @return
+   * @param combinedVectors: the vectors to hold the values when iterate the batches.
+   *
+   * @return number of batches
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
+  public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
        throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
-    Map<String, List<Object>> combinedVectors = new TreeMap<>();
-
+    int numBatch = 0;
     long totalRecords = 0;
     BatchSchema schema = null;
     for (VectorAccessible loader : batches)  {
+      numBatch++;
       if (expectedSchema != null) {
         if (! expectedSchema.equals(loader.getSchema())) {
           throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
@@ -412,12 +416,12 @@ public class DrillTestWrapper {
         }
       }
     }
-    return combinedVectors;
+    return numBatch;
   }
 
   protected void compareSchemaOnly() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    List<QueryDataBatch> actual;
+    List<QueryDataBatch> actual = null;
     QueryDataBatch batch = null;
     try {
       test(testOptionSettingQueries);
@@ -448,8 +452,10 @@ public class DrillTestWrapper {
       }
 
     } finally {
-      if (batch != null) {
-        batch.release();
+      if (actual != null) {
+        for (QueryDataBatch b : actual) {
+          b.release();
+        }
       }
       loader.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 36a713f..acde8ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -39,6 +39,8 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.Text;
@@ -261,6 +263,14 @@ public class TestBuilder {
         expectedNumBatches);
   }
 
+  public SchemaTestBuilder schemaBaseLine(BatchSchema batchSchema) {
+    List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = new ArrayList<>();
+    for (final MaterializedField field : batchSchema) {
+      expectedSchema.add(Pair.of(SchemaPath.getSimplePath(field.getName()), field.getType()));
+    }
+    return schemaBaseLine(expectedSchema);
+  }
+
   public SchemaTestBuilder schemaBaseLine(List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
     assert expectedSchema != null : "The expected schema can be provided once";
     assert baselineColumns == null : "The column information should be captured in expected schema, not baselineColumns";

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 97df2ee..bbfe093 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -416,7 +416,7 @@ public class TestExampleQueries extends BaseTestQuery {
 
   @Test
   public void testCase() throws Exception {
-    test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end from cp.`tpch/nation.parquet` ;");
+    test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end, n_comment from cp.`tpch/nation.parquet` ;");
   }
 
   @Test // tests join condition that has different input types
@@ -1194,5 +1194,4 @@ public class TestExampleQueries extends BaseTestQuery {
         .build()
         .run();
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 6965ab5..63d21ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -586,7 +587,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllRightEmptyBatch() throws Exception {
+  public void testUnionAllRightEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     String queryRightEmptyBatch = String.format(
@@ -606,7 +607,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllLeftEmptyBatch() throws Exception {
+  public void testUnionAllLeftEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
 
     final String queryLeftBatch = String.format(
@@ -627,7 +628,7 @@ public class TestUnionAll extends BaseTestQuery {
   }
 
   @Test
-  public void testUnionAllBothEmptyBatch() throws Exception {
+  public void testUnionAllBothEmptyDataBatch() throws Exception {
     String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
     final String query = String.format(
         "select key from dfs_test.`%s` where 1 = 0 " +
@@ -638,7 +639,7 @@ public class TestUnionAll extends BaseTestQuery {
 
     final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
     final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
-        .setMinorType(TypeProtos.MinorType.INT)
+        .setMinorType(TypeProtos.MinorType.BIT) // field "key" is boolean type
         .setMode(TypeProtos.DataMode.OPTIONAL)
         .build();
     expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));


[3/3] drill git commit: DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

Posted by jn...@apache.org.
DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

1. Modify ScanBatch's logic when it iterates list of RecordReader.
   1) Skip RecordReader if it returns 0 row && present same schema. A new schema (by calling Mutator.isNewSchema() ) means either a new top level field is added, or a field in a nested field is added, or an existing field type is changed.
   2) Implicit columns are presumed to have constant schema, and are added to outgoing container before any regular column is added in.
   3) ScanBatch will return NONE directly (called as "fast NONE"), if all its RecordReaders haver empty input and thus are skipped, in stead of returing OK_NEW_SCHEMA first.

2. Modify IteratorValidatorBatchIterator to allow
   1) fast NONE ( before seeing a OK_NEW_SCHEMA)
   2) batch with empty list of columns.

2. Modify JsonRecordReader when it get 0 row. Do not insert a nullable-int column for 0 row input. Together with ScanBatch, Drill will skip empty json files.

3. Modify binary operators such as join, union to handle fast none for either one side or both sides. Abstract the logic in AbstractBinaryRecordBatch, except for MergeJoin as its implementation is quite different from others.

4. Fix and refactor union all operator.
  1) Correct union operator hanndling 0 input rows. Previously, it will ignore inputs with 0 row and put nullable-int into output schema, which causes various of schema change issue in down-stream operator. The new behavior is to take schema with 0 into account
  in determining the output schema, in the same way with > 0 input rows. By doing that, we ensure Union operator will not behave like a schema-lossy operator.
  2) Add a UnionInputIterator to simplify the logic to iterate the left/right inputs, removing significant chunk of duplicate codes in previous implementation.
  The new union all operator reduces the code size into half, comparing the old one.

5. Introduce UntypedNullVector to handle convertFromJson() function, when the input batch contains 0 row.
  Problem: The function convertFromJSon() is different from other regular functions in that it only knows the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have
  a way to know the output type, and previously will assume Map type. That works under the assumption other operators like Union would ignore batch with 0 row, which is no longer
  the case in the current implementation.
  Solution: Use MinorType.NULL at the output type for convertFromJSON() when input contains 0 row. The new UntypedNullVector is used to represent a column with MinorType.NULL.

6. HBaseGroupScan convert star column into list of row_key and column family. HBaseRecordReader should reject column star since it expectes star has been converted somewhere else.
  In HBase a column family always has map type, and a non-rowkey column always has nullable varbinary type, this ensures that HBaseRecordReader across different HBase regions will have the same top level schema, even if the region is
  empty or prune all the rows due to filter pushdown optimization. In other words, we will not see different top level schema from different HBaseRecordReader for the same table.
  However, such change will not be able to handle hard schema change : c1 exists in cf1 in one region, but not in another region. Further work is required to handle hard schema change.

7. Modify scan cost estimation when the query involves * column. This is to remove the planning randomness since previously two different operators could have same cost.

8. Add a new flag 'outputProj' to Project operator, to indicate if Project is for the query's final output. Such Project is added by TopProjectVisitor, to handle fast NONE when all the inputs to the query are empty
and are skipped.
  1) column star is replaced with empty list
  2) regular column reference is replaced with nullable-int column
  3) An expression will go through ExpressionTreeMaterializer, and use the type of materialized expression as the output type
  4) Return an OK_NEW_SCHEMA with the schema using the above logic, then return a NONE to down-stream operator.

9. Add unit test to test operators handling empty input.

10. Add unit test to test query when inputs are all empty.

DRILL-5546: Revise code based on review comments.

Handle implicit column in scan batch. Change interface in ScanBatch's constructor.
 1) Ensure either the implicit column list is empty, or all the reader has the same set of implicit columns.
 2) We could skip the implicit columns when check if there is a schema change coming from record reader.
 3) ScanBatch accept a list in stead of iterator, since we may need go through the implicit column list multiple times, and verify the size of two lists are same.

ScanBatch code review comments. Add more unit tests.

Share code path in ProjectBatch to handle normal setupNewSchema() and handleNullInput().
 - Move SimpleRecordBatch out of TopNBatch to make it sharable across different places.
 - Add Unit test verify schema for star column query against multilevel tables.

Unit test framework change
 - Fix memory leak in unit test framework.
 - Allow SchemaTestBuilder to pass in BatchSchema.

close #906


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fde0a1df
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fde0a1df
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fde0a1df

Branch: refs/heads/master
Commit: fde0a1df1734e0742b49aabdd28b02202ee2b044
Parents: e1649dd
Author: Jinfeng Ni <jn...@apache.org>
Authored: Wed May 17 16:08:00 2017 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Tue Sep 5 12:07:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |   9 +
 .../store/mapr/db/MapRDBScanBatchCreator.java   |   2 +-
 .../mapr/db/json/MaprDBJsonRecordReader.java    |   5 +-
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  93 ++-
 .../exec/store/hbase/HBaseRecordReader.java     |   5 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java |   2 +-
 .../hive/HiveDrillNativeScanBatchCreator.java   |   6 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |   4 +-
 .../exec/store/hive/HiveScanBatchCreator.java   |   2 +-
 .../drill/exec/store/jdbc/JdbcBatchCreator.java |   2 +-
 .../exec/store/kudu/KuduScanBatchCreator.java   |   2 +-
 .../exec/store/mongo/MongoRecordReader.java     |   3 +-
 .../exec/store/mongo/MongoScanBatchCreator.java |   2 +-
 .../src/main/codegen/templates/TypeHelper.java  |   2 +
 .../drill/exec/physical/config/Project.java     |  23 +-
 .../drill/exec/physical/impl/ScanBatch.java     | 407 +++++-----
 .../exec/physical/impl/TopN/TopNBatch.java      |  73 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |  33 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |   6 +
 .../physical/impl/join/NestedLoopJoinBatch.java |  31 +-
 .../impl/project/ProjectRecordBatch.java        | 117 ++-
 .../impl/union/UnionAllRecordBatch.java         | 737 ++++++-------------
 .../IteratorValidatorBatchIterator.java         |  18 +-
 .../impl/values/ValuesBatchCreator.java         |   2 +-
 .../exec/planner/logical/DrillScanRel.java      |  10 +-
 .../exec/planner/physical/ProjectPrel.java      |  31 +-
 .../physical/visitor/TopProjectVisitor.java     |  20 +-
 .../exec/record/AbstractBinaryRecordBatch.java  |  75 ++
 .../exec/record/AbstractSingleRecordBatch.java  |  26 +
 .../drill/exec/record/SimpleRecordBatch.java    |  98 +++
 .../drill/exec/store/AbstractRecordReader.java  |  20 +-
 .../apache/drill/exec/store/ColumnExplorer.java |   3 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   2 +-
 .../exec/store/direct/DirectBatchCreator.java   |   2 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   6 +-
 .../store/ischema/InfoSchemaBatchCreator.java   |   2 +-
 .../exec/store/mock/MockScanBatchCreator.java   |   2 +-
 .../drill/exec/store/parquet/Metadata.java      |   4 +-
 .../store/parquet/ParquetReaderUtility.java     |   4 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   2 +-
 .../parquet/columnreaders/ParquetSchema.java    |   3 +-
 .../exec/store/sys/SystemTableBatchCreator.java |   2 +-
 .../org/apache/drill/exec/util/Utilities.java   |  23 +
 .../org/apache/drill/exec/util/VectorUtil.java  |  15 +-
 .../java/org/apache/drill/DrillTestWrapper.java |  24 +-
 .../test/java/org/apache/drill/TestBuilder.java |  10 +
 .../org/apache/drill/TestExampleQueries.java    |   3 +-
 .../java/org/apache/drill/TestUnionAll.java     |   9 +-
 .../org/apache/drill/TestUnionDistinct.java     |   9 +-
 .../apache/drill/exec/TestEmptyInputSql.java    | 203 +++++
 .../partitionsender/TestPartitionSender.java    |   2 +-
 .../physical/impl/union/TestSimpleUnion.java    |   2 +-
 .../physical/unit/MiniPlanUnitTestBase.java     |  75 +-
 .../physical/unit/PhysicalOpUnitTestBase.java   |  10 +-
 .../drill/exec/physical/unit/TestMiniPlan.java  |  69 +-
 .../physical/unit/TestNullInputMiniPlan.java    | 572 ++++++++++++++
 .../exec/store/TestImplicitFileColumns.java     |  69 ++
 .../scan/emptyInput/emptyCsv/empty.csv          |   0
 .../scan/emptyInput/emptyCsvH/empty.csvh        |   0
 .../scan/emptyInput/emptyJson/empty.json        |   0
 .../scan/emptyInput/emptyJson/empty2.json       |   0
 .../src/test/resources/scan/jsonTbl/1990/1.json |   2 +
 .../src/test/resources/scan/jsonTbl/1991/2.json |   1 +
 .../main/codegen/templates/BasicTypeHelper.java |  21 +-
 .../main/codegen/templates/ValueHolders.java    |   8 +-
 .../drill/exec/record/MaterializedField.java    |  10 +
 .../drill/exec/vector/UntypedNullHolder.java    |  46 ++
 .../drill/exec/vector/UntypedNullVector.java    | 270 +++++++
 68 files changed, 2278 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 692d8f5..7c7026b 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -480,6 +480,10 @@ public class Types {
     return type.getMinorType() == MinorType.LATE;
   }
 
+  public static boolean isUntypedNull(final MajorType type) {
+    return type.getMinorType() == MinorType.NULL;
+  }
+
   public static MajorType withMode(final MinorType type, final DataMode mode) {
     return MajorType.newBuilder().setMode(mode).setMinorType(type).build();
   }
@@ -719,4 +723,9 @@ public class Types {
     }
     return typeBuilder;
   }
+
+  public static boolean isLaterType(MajorType type) {
+    return type.getMinorType() == MinorType.LATE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index d4a3f06..e770c96 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -55,7 +55,7 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
         throw new ExecutionSetupException(e);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
   private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 5921249..ca31767 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
 import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -124,13 +125,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
     if (disablePushdown) {
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
       includeId = true;
       return transformed;
     }
 
     if (isStarQuery()) {
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
       includeId = true;
       if (isSkipQuery()) {
     	// `SELECT COUNT(*)` query

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index e474c11..1ee1da8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -17,22 +17,17 @@
  */
 package org.apache.drill.exec.store.hbase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -43,9 +38,9 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -55,18 +50,24 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
@@ -144,7 +145,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   public GroupScan clone(List<SchemaPath> columns) {
     HBaseGroupScan newScan = new HBaseGroupScan(this);
     newScan.columns = columns == null ? ALL_COLUMNS : columns;;
-    newScan.verifyColumns();
+    newScan.verifyColumnsAndConvertStar();
     return newScan;
   }
 
@@ -176,19 +177,37 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     } catch (IOException e) {
       throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
     }
-    verifyColumns();
+    verifyColumnsAndConvertStar();
   }
 
-  private void verifyColumns() {
-    if (AbstractRecordReader.isStarQuery(columns)) {
-      return;
-    }
+  private void verifyColumnsAndConvertStar() {
+    boolean hasStarCol = false;
+    LinkedHashSet<SchemaPath> requestedColumns = new LinkedHashSet<>();
+
     for (SchemaPath column : columns) {
-      if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
-        DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
-            column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+      // convert * into [row_key, cf1, cf2, ..., cf_n].
+      if (column.equals(Utilities.STAR_COLUMN)) {
+        hasStarCol = true;
+        Set<byte[]> families = hTableDesc.getFamiliesKeys();
+        requestedColumns.add(ROW_KEY_PATH);
+        for (byte[] family : families) {
+          SchemaPath colFamily = SchemaPath.getSimplePath(Bytes.toString(family));
+          requestedColumns.add(colFamily);
+        }
+      } else {
+        if (!(column.equals(ROW_KEY_PATH) ||
+            hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+          DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
+              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+        }
+        requestedColumns.add(column);
       }
     }
+
+    // since star column has been converted, reset this.cloumns.
+    if (hasStarCol) {
+      this.columns = new ArrayList<>(requestedColumns);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 3f308ce..d6c02b5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -126,8 +126,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
             HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()));
       }
     } else {
-      rowKeyOnly = false;
-      transformed.add(ROW_KEY_PATH);
+      throw new IllegalArgumentException("HBaseRecordReader does not allow column *. Column * should have been converted to list of <row_key, column family1, column family2, ..., column family_n");
+//      rowKeyOnly = false;
+//      transformed.add(ROW_KEY_PATH);
     }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 3a098fc..8e815b9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -50,7 +50,7 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 6c10d25..0e5314b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -33,12 +33,12 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -67,7 +67,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
     final String partitionDesignator = context.getOptions()
         .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     List<Map<String, String>> implicitColumns = Lists.newLinkedList();
-    boolean selectAllQuery = AbstractRecordReader.isStarQuery(columns);
+    boolean selectAllQuery = Utilities.isStarQuery(columns);
 
     final boolean hasPartitions = (partitions != null && partitions.size() > 0);
 
@@ -173,7 +173,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
         ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
     }
 
-    return new ScanBatch(config, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(config, context, oContext, readers, implicitColumns);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index c6cc8a2..42fb3e2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -36,11 +36,11 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
 import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -241,7 +241,7 @@ public class HiveScan extends AbstractGroupScan {
 
   protected int getSerDeOverheadFactor() {
     final int projectedColumnCount;
-    if (AbstractRecordReader.isStarQuery(columns)) {
+    if (Utilities.isStarQuery(columns)) {
       Table hiveTable = hiveReadEntry.getTable();
       projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
     } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 47ea323..e287f68 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -92,6 +92,6 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
     } catch(Exception e) {
       logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
     }
-    return new ScanBatch(config, context, readers.iterator());
+    return new ScanBatch(config, context, readers);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
index fa44b55..1782e1a 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -37,6 +37,6 @@ public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
     Preconditions.checkArgument(children.isEmpty());
     JdbcStoragePlugin plugin = config.getPlugin();
     RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
-    return new ScanBatch(config, context, Collections.singletonList(reader).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index b3c2c4e..fc1db5d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -51,7 +51,7 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index c9ce5bb..77def0a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.bson.BsonRecordReader;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -109,7 +110,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     } else {
       // Tale all the fields including the _id
       this.fields.remove(DrillMongoConstants.ID);
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
     }
     return transformed;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 49b1750..9935184 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -60,7 +60,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
       }
     }
     logger.info("Number of record readers initialized : " + readers.size());
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 8390e30..5ccda85 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -89,6 +89,8 @@ public class TypeHelper extends BasicTypeHelper {
 </#list>
       case GENERIC_OBJECT:
         return model._ref(ObjectHolder.class);
+    case NULL:
+      return model._ref(UntypedNullHolder.class);
       default:
         break;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
index 7b58ecd..b0188ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -36,8 +36,19 @@ public class Project extends AbstractSingle{
 
   private final List<NamedExpression> exprs;
 
+  /**
+   * {@link org.apache.drill.exec.planner.physical.ProjectPrel for the meaning of flag 'outputProj'}
+   */
+  private boolean outputProj = false;
+
   @JsonCreator
-  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child, @JsonProperty("outputProj") boolean outputProj) {
+    super(child);
+    this.exprs = exprs;
+    this.outputProj = outputProj;
+  }
+
+  public Project(List<NamedExpression> exprs, PhysicalOperator child) {
     super(child);
     this.exprs = exprs;
   }
@@ -46,6 +57,14 @@ public class Project extends AbstractSingle{
     return exprs;
   }
 
+  /**
+   * @Return true if Project is for the query's final output. Such Project is added by TopProjectVisitor,
+   * to handle fast NONE when all the inputs to the query are empty and are skipped.
+   */
+  public boolean isOutputProj() {
+    return outputProj;
+  }
+
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
     return physicalVisitor.visitProject(this, value);
@@ -53,7 +72,7 @@ public class Project extends AbstractSingle{
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new Project(exprs, child);
+    return new Project(exprs, child, outputProj);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 803bd48..64be129 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,16 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -55,10 +52,11 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.common.map.CaseInsensitiveMap;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -78,50 +76,56 @@ public class ScanBatch implements CloseableRecordBatch {
   private BatchSchema schema;
   private final Mutator mutator;
   private boolean done = false;
-  private boolean hasReadNonEmptyFile = false;
-  private Map<String, ValueVector> implicitVectors;
   private Iterator<Map<String, String>> implicitColumns;
   private Map<String, String> implicitValues;
   private final BufferAllocator allocator;
-
+  private final List<Map<String, String>> implicitColumnList;
+  private String currentReaderClassName;
+  /**
+   *
+   * @param subScanConfig
+   * @param context
+   * @param oContext
+   * @param readerList
+   * @param implicitColumnList : either an emptylist when all the readers do not have implicit
+   *                        columns, or there is a one-to-one mapping between reader and implicitColumns.
+   */
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
-                   OperatorContext oContext, Iterator<RecordReader> readers,
-                   List<Map<String, String>> implicitColumns) {
+                   OperatorContext oContext, List<RecordReader> readerList,
+                   List<Map<String, String>> implicitColumnList) {
     this.context = context;
-    this.readers = readers;
+    this.readers = readerList.iterator();
+    this.implicitColumns = implicitColumnList.iterator();
     if (!readers.hasNext()) {
       throw UserException.systemError(
           new ExecutionSetupException("A scan batch must contain at least one reader."))
         .build(logger);
     }
-    currentReader = readers.next();
+
     this.oContext = oContext;
     allocator = oContext.getAllocator();
     mutator = new Mutator(oContext, allocator, container);
 
+    oContext.getStats().startProcessing();
     try {
-      oContext.getStats().startProcessing();
-      currentReader.setup(oContext, mutator);
-    } catch (ExecutionSetupException e) {
-      try {
-        currentReader.close();
-      } catch(final Exception e2) {
-        logger.error("Close failed for reader " + currentReader.getClass().getSimpleName(), e2);
-      }
-      throw UserException.systemError(e)
-            .addContext("Setup failed for", currentReader.getClass().getSimpleName())
+      if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) {
+        Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, "
+            + "or implicit columns are not same across all the record readers!");
+        throw UserException.systemError(ex)
+            .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName())
             .build(logger);
+      }
+
+      this.implicitColumnList = implicitColumnList;
+      addImplicitVectors();
+      currentReader = null;
     } finally {
       oContext.getStats().stopProcessing();
     }
-    this.implicitColumns = implicitColumns.iterator();
-    this.implicitValues = this.implicitColumns.hasNext() ? this.implicitColumns.next() : null;
-
-    addImplicitVectors();
   }
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
-                   Iterator<RecordReader> readers)
+                   List<RecordReader> readers)
       throws ExecutionSetupException {
     this(subScanConfig, context,
         context.newOperatorContext(subScanConfig),
@@ -152,16 +156,6 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-  private void releaseAssets() {
-    container.zeroVectors();
-  }
-
-  private void clearFieldVectorMap() {
-    for (final ValueVector v : mutator.fieldVectorMap().values()) {
-      v.clear();
-    }
-  }
-
   @Override
   public IterOutcome next() {
     if (done) {
@@ -169,82 +163,57 @@ public class ScanBatch implements CloseableRecordBatch {
     }
     oContext.getStats().startProcessing();
     try {
-      try {
+      while (true) {
+        if (currentReader == null && !getNextReaderIfHas()) {
+            releaseAssets(); // All data has been read. Release resource.
+            done = true;
+            return IterOutcome.NONE;
+        }
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
-
         currentReader.allocate(mutator.fieldVectorMap());
-      } catch (OutOfMemoryException e) {
-        clearFieldVectorMap();
-        throw UserException.memoryError(e).build(logger);
-      }
-      while ((recordCount = currentReader.next()) == 0) {
-        try {
-          if (!readers.hasNext()) {
-            // We're on the last reader, and it has no (more) rows.
-            currentReader.close();
-            releaseAssets();
-            done = true;  // have any future call to next() return NONE
 
-            if (mutator.isNewSchema()) {
-              // This last reader has a new schema (e.g., we have a zero-row
-              // file or other source).  (Note that some sources have a non-
-              // null/non-trivial schema even when there are no rows.)
-
-              container.buildSchema(SelectionVectorMode.NONE);
-              schema = container.getSchema();
-
-              return IterOutcome.OK_NEW_SCHEMA;
-            }
-            return IterOutcome.NONE;
-          }
-          // At this point, the reader that hit its end is not the last reader.
-
-          // If all the files we have read so far are just empty, the schema is not useful
-          if (! hasReadNonEmptyFile) {
-            container.clear();
-            clearFieldVectorMap();
-            mutator.clear();
-          }
+        recordCount = currentReader.next();
+        Preconditions.checkArgument(recordCount >= 0, "recordCount from RecordReader.next() should not be negative");
+        boolean isNewSchema = mutator.isNewSchema();
+        populateImplicitVectorsAndSetCount();
+        oContext.getStats().batchReceived(0, recordCount, isNewSchema);
 
+        if (recordCount == 0) {
           currentReader.close();
-          currentReader = readers.next();
-          implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
-          currentReader.setup(oContext, mutator);
-          try {
-            currentReader.allocate(mutator.fieldVectorMap());
-          } catch (OutOfMemoryException e) {
-            clearFieldVectorMap();
-            throw UserException.memoryError(e).build(logger);
-          }
-          addImplicitVectors();
-        } catch (ExecutionSetupException e) {
-          releaseAssets();
-          throw UserException.systemError(e).build(logger);
+          currentReader = null; // indicate currentReader is complete,
+                                // and fetch next reader in next loop iterator if required.
         }
-      }
-
-      // At this point, the current reader has read 1 or more rows.
-
-      hasReadNonEmptyFile = true;
-      populateImplicitVectors();
-
-      for (VectorWrapper<?> w : container) {
-        w.getValueVector().getMutator().setValueCount(recordCount);
-      }
 
-      // this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
-      final boolean isNewSchema = mutator.isNewSchema();
-      oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
+        if (isNewSchema) {
+          // Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema.
+          // This could happen when data sources have a non-trivial schema with 0 row.
+          container.buildSchema(SelectionVectorMode.NONE);
+          schema = container.getSchema();
+          return IterOutcome.OK_NEW_SCHEMA;
+        }
 
-      if (isNewSchema) {
-        container.buildSchema(SelectionVectorMode.NONE);
-        schema = container.getSchema();
-        return IterOutcome.OK_NEW_SCHEMA;
-      } else {
-        return IterOutcome.OK;
+        // Handle case of same schema.
+        if (recordCount == 0) {
+            continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
+        } else {
+          // return OK if recordCount > 0 && ! isNewSchema
+          return IterOutcome.OK;
+        }
       }
     } catch (OutOfMemoryException ex) {
+      clearFieldVectorMap();
       throw UserException.memoryError(ex).build(logger);
+    } catch (ExecutionSetupException e) {
+      if (currentReader != null) {
+        try {
+          currentReader.close();
+        } catch (final Exception e2) {
+          logger.error("Close failed for reader " + currentReaderClassName, e2);
+        }
+      }
+      throw UserException.systemError(e)
+          .addContext("Setup failed for", currentReaderClassName)
+          .build(logger);
     } catch (Exception ex) {
       throw UserException.systemError(ex).build(logger);
     } finally {
@@ -252,21 +221,38 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
+  private void releaseAssets() {
+    container.zeroVectors();
+  }
+
+  private void clearFieldVectorMap() {
+    for (final ValueVector v : mutator.fieldVectorMap().values()) {
+      v.clear();
+    }
+    for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
+      v.clear();
+    }
+  }
+
+  private boolean getNextReaderIfHas() throws ExecutionSetupException {
+    if (readers.hasNext()) {
+      currentReader = readers.next();
+      implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
+      currentReader.setup(oContext, mutator);
+      currentReaderClassName = currentReader.getClass().getSimpleName();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   private void addImplicitVectors() {
     try {
-      if (implicitVectors != null) {
-        for (ValueVector v : implicitVectors.values()) {
-          v.clear();
-        }
-      }
-      implicitVectors = Maps.newHashMap();
-
-      if (implicitValues != null) {
-        for (String column : implicitValues.keySet()) {
+      if (!implicitColumnList.isEmpty()) {
+        for (String column : implicitColumnList.get(0).keySet()) {
           final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR));
           @SuppressWarnings("resource")
-          final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
-          implicitVectors.put(column, v);
+          final ValueVector v = mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/);
         }
       }
     } catch(SchemaChangeException e) {
@@ -277,24 +263,11 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-  private void populateImplicitVectors() {
-    if (implicitValues != null) {
-      for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
-        @SuppressWarnings("resource")
-        final NullableVarCharVector v = (NullableVarCharVector) implicitVectors.get(entry.getKey());
-        String val;
-        if ((val = entry.getValue()) != null) {
-          AllocationHelper.allocate(v, recordCount, val.length());
-          final byte[] bytes = val.getBytes();
-          for (int j = 0; j < recordCount; j++) {
-            v.getMutator().setSafe(j, bytes, 0, bytes.length);
-          }
-          v.getMutator().setValueCount(recordCount);
-        } else {
-          AllocationHelper.allocate(v, recordCount, 0);
-          v.getMutator().setValueCount(recordCount);
-        }
-      }
+  private void populateImplicitVectorsAndSetCount() {
+    mutator.populateImplicitVectors(implicitValues, recordCount);
+    for (Map.Entry<String, ValueVector> entry: mutator.fieldVectorMap().entrySet()) {
+      logger.debug("set record count {} for vv {}", recordCount, entry.getKey());
+      entry.getValue().getMutator().setValueCount(recordCount);
     }
   }
 
@@ -329,14 +302,20 @@ public class ScanBatch implements CloseableRecordBatch {
 
   @VisibleForTesting
   public static class Mutator implements OutputMutator {
-    /** Whether schema has changed since last inquiry (via #isNewSchema}).  Is
-     *  true before first inquiry. */
-    private boolean schemaChanged = true;
-
-    /** Fields' value vectors indexed by fields' keys. */
-    private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+    /** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
+     * It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector
+     * or a value vector with different type is added to fieldVectorMap.
+     **/
+    private boolean schemaChanged;
+
+    /** Regular fields' value vectors indexed by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> regularFieldVectorMap =
             CaseInsensitiveMap.newHashMap();
 
+    /** Implicit fields' value vectors index by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> implicitFieldVectorMap =
+        CaseInsensitiveMap.newHashMap();
+
     private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     private final BufferAllocator allocator;
 
@@ -348,46 +327,27 @@ public class ScanBatch implements CloseableRecordBatch {
       this.oContext = oContext;
       this.allocator = allocator;
       this.container = container;
+      this.schemaChanged = false;
     }
 
     public Map<String, ValueVector> fieldVectorMap() {
-      return fieldVectorMap;
+      return regularFieldVectorMap;
+    }
+
+    public Map<String, ValueVector> implicitFieldVectorMap() {
+      return implicitFieldVectorMap;
     }
 
     @SuppressWarnings("resource")
     @Override
     public <T extends ValueVector> T addField(MaterializedField field,
                                               Class<T> clazz) throws SchemaChangeException {
-      // Check if the field exists.
-      ValueVector v = fieldVectorMap.get(field.getName());
-      if (v == null || v.getClass() != clazz) {
-        // Field does not exist--add it to the map and the output container.
-        v = TypeHelper.getNewVector(field, allocator, callBack);
-        if (!clazz.isAssignableFrom(v.getClass())) {
-          throw new SchemaChangeException(
-            String.format(
-              "The class that was provided, %s, does not correspond to the "
-                + "expected vector type of %s.",
-              clazz.getSimpleName(), v.getClass().getSimpleName()));
-        }
-
-        final ValueVector old = fieldVectorMap.put(field.getName(), v);
-        if (old != null) {
-          old.clear();
-          container.remove(old);
-        }
-
-        container.add(v);
-        // Added new vectors to the container--mark that the schema has changed.
-        schemaChanged = true;
-      }
-
-      return clazz.cast(v);
+      return addField(field, clazz, false);
     }
 
     @Override
     public void allocate(int recordCount) {
-      for (final ValueVector v : fieldVectorMap.values()) {
+      for (final ValueVector v : regularFieldVectorMap.values()) {
         AllocationHelper.allocate(v, recordCount, 50, 10);
       }
     }
@@ -423,10 +383,82 @@ public class ScanBatch implements CloseableRecordBatch {
     }
 
     public void clear() {
-      fieldVectorMap.clear();
+      regularFieldVectorMap.clear();
+      implicitFieldVectorMap.clear();
+      schemaChanged = false;
+    }
+
+    private <T extends ValueVector> T addField(MaterializedField field,
+        Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
+      Map<String, ValueVector> fieldVectorMap;
+
+      if (isImplicitField) {
+        fieldVectorMap = implicitFieldVectorMap;
+      } else {
+        fieldVectorMap = regularFieldVectorMap;
+      }
+
+      if (!isImplicitField && implicitFieldVectorMap.containsKey(field.getName()) ||
+          isImplicitField && regularFieldVectorMap.containsKey(field.getName())) {
+        throw new SchemaChangeException(
+            String.format(
+                "It's not allowed to have regular field and implicit field share common name %s. "
+                    + "Either change regular field name in datasource, or change the default implicit field names.",
+                field.getName()));
+      }
+
+      // Check if the field exists.
+      ValueVector v = fieldVectorMap.get(field.getName());
+      if (v == null || v.getClass() != clazz) {
+        // Field does not exist--add it to the map and the output container.
+        v = TypeHelper.getNewVector(field, allocator, callBack);
+        if (!clazz.isAssignableFrom(v.getClass())) {
+          throw new SchemaChangeException(
+              String.format(
+                  "The class that was provided, %s, does not correspond to the "
+                      + "expected vector type of %s.",
+                  clazz.getSimpleName(), v.getClass().getSimpleName()));
+        }
+
+        final ValueVector old = fieldVectorMap.put(field.getName(), v);
+        if (old != null) {
+          old.clear();
+          container.remove(old);
+        }
+
+        container.add(v);
+        // Only mark schema change for regular vectors added to the container; implicit schema is constant.
+        if (!isImplicitField) {
+          schemaChanged = true;
+        }
+      }
+
+      return clazz.cast(v);
+    }
+
+    private void populateImplicitVectors(Map<String, String> implicitValues, int recordCount) {
+      if (implicitValues != null) {
+        for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
+          @SuppressWarnings("resource")
+          final NullableVarCharVector v = (NullableVarCharVector) implicitFieldVectorMap.get(entry.getKey());
+          String val;
+          if ((val = entry.getValue()) != null) {
+            AllocationHelper.allocate(v, recordCount, val.length());
+            final byte[] bytes = val.getBytes();
+            for (int j = 0; j < recordCount; j++) {
+              v.getMutator().setSafe(j, bytes, 0, bytes.length);
+            }
+            v.getMutator().setValueCount(recordCount);
+          } else {
+            AllocationHelper.allocate(v, recordCount, 0);
+            v.getMutator().setValueCount(recordCount);
+          }
+        }
+      }
     }
   }
 
+
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();
@@ -440,11 +472,10 @@ public class ScanBatch implements CloseableRecordBatch {
   @Override
   public void close() throws Exception {
     container.clear();
-    for (final ValueVector v : implicitVectors.values()) {
-      v.clear();
-    }
     mutator.clear();
-    currentReader.close();
+    if (currentReader != null) {
+      currentReader.close();
+    }
   }
 
   @Override
@@ -453,4 +484,34 @@ public class ScanBatch implements CloseableRecordBatch {
         String.format("You should not call getOutgoingContainer() for class %s",
                       this.getClass().getCanonicalName()));
   }
+
+  /**
+   * Verify list of implicit column values is valid input:
+   *   - Either implicit column list is empty;
+   *   - Or implicit column list has same sie as reader list, and the key set is same across all the readers.
+   * @param numReaders
+   * @param implicitColumnList
+   * @return return true if
+   */
+  private boolean verifyImplcitColumns(int numReaders, List<Map<String, String>> implicitColumnList) {
+    if (implicitColumnList.isEmpty()) {
+      return true;
+    }
+
+    if (numReaders != implicitColumnList.size()) {
+      return false;
+    }
+
+    Map<String, String> firstMap = implicitColumnList.get(0);
+
+    for (int i = 1; i< implicitColumnList.size(); i++) {
+      Map<String, String> nonFirstMap = implicitColumnList.get(i);
+
+      if (!firstMap.keySet().equals(nonFirstMap.keySet())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index d2497f1..e77c186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -66,6 +67,8 @@ import com.google.common.base.Stopwatch;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
+import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
+
 public class TopNBatch extends AbstractRecordBatch<TopN> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
 
@@ -290,8 +293,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
     SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
-    SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
-    SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+    SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     if (copier == null) {
       copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
     } else {
@@ -391,8 +394,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     final VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
-    final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
-    final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+    final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
@@ -440,26 +443,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
 
-  public static class SimpleRecordBatch implements RecordBatch {
-
-    private VectorContainer container;
+  public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
     private SelectionVector4 sv4;
-    private FragmentContext context;
 
-    public SimpleRecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
-      this.container = container;
+    public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
+      super(container, context);
       this.sv4 = sv4;
-      this.context = context;
-    }
-
-    @Override
-    public FragmentContext getContext() {
-      return context;
-    }
-
-    @Override
-    public BatchSchema getSchema() {
-      return container.getSchema();
     }
 
     @Override
@@ -467,54 +456,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       if (sv4 != null) {
         return sv4.getCount();
       } else {
-        return container.getRecordCount();
+        return super.getRecordCount();
       }
     }
 
     @Override
-    public void kill(boolean sendUpstream) {
-    }
-
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public SelectionVector4 getSelectionVector4() {
       return sv4;
     }
-
-    @Override
-    public TypedFieldId getValueVectorId(SchemaPath path) {
-      return container.getValueVectorId(path);
-    }
-
-    @Override
-    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-      return container.getValueAccessorById(clazz, ids);
-    }
-
-    @Override
-    public IterOutcome next() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public WritableBatch getWritableBatch() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterator<VectorWrapper<?>> iterator() {
-      return container.iterator();
-    }
-
-    @Override
-    public VectorContainer getOutgoingContainer() {
-      throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
-    }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 1f74ba1..8c899aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -64,16 +65,10 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JVar;
 
-public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
-  // Probe side record batch
-  private final RecordBatch left;
-
-  // Build side record batch
-  private final RecordBatch right;
-
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
 
@@ -145,9 +140,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   // indicates if we have previously returned an output batch
   boolean firstOutputBatch = true;
 
-  IterOutcome leftUpstream = IterOutcome.NONE;
-  IterOutcome rightUpstream = IterOutcome.NONE;
-
   private final HashTableStats htStats = new HashTableStats();
 
   public enum Metric implements MetricDef {
@@ -172,16 +164,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    leftUpstream = next(left);
-    rightUpstream = next(right);
-
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return;
-    }
-
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
+    if (! prefetchFirstBatchFromBothSides()) {
       return;
     }
 
@@ -503,11 +486,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
   }
 
-  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left,
-      RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context, true);
-    this.left = left;
-    this.right = right;
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+      RecordBatch left, /*Probe side record batch*/
+      RecordBatch right /*Build side record batch*/
+  ) throws OutOfMemoryException {
+    super(popConfig, context, true, left, right);
     joinType = popConfig.getJoinType();
     conditions = popConfig.getConditions();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e599702..a1b8dc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -151,6 +151,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       state = BatchState.OUT_OF_MEMORY;
       return;
     }
+
+    if (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      return;
+    }
+
     allocateBatch(true);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 35cc710..b390e41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -62,7 +63,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 /*
  * RecordBatch implementation for the nested loop join operator
  */
-public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> {
+public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
   // Maximum number records in the outgoing batch
@@ -72,24 +73,12 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
 
-  // Left input to the nested loop join operator
-  private final RecordBatch left;
-
   // Schema on the left side
   private BatchSchema leftSchema = null;
 
-  // state (IterOutcome) of the left input
-  private IterOutcome leftUpstream = IterOutcome.NONE;
-
-  // Right input to the nested loop join operator.
-  private final RecordBatch right;
-
   // Schema on the right side
   private BatchSchema rightSchema = null;
 
-  // state (IterOutcome) of the right input
-  private IterOutcome rightUpstream = IterOutcome.NONE;
-
   // Runtime generated class implementing the NestedLoopJoin interface
   private NestedLoopJoin nljWorker = null;
 
@@ -134,11 +123,9 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       EMIT_LEFT_CONSTANT, EMIT_LEFT);
 
   protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context);
+    super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
-    this.left = left;
-    this.right = right;
   }
 
   /**
@@ -352,18 +339,8 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
    */
   @Override
   protected void buildSchema() throws SchemaChangeException {
-
     try {
-      leftUpstream = next(LEFT_INPUT, left);
-      rightUpstream = next(RIGHT_INPUT, right);
-
-      if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-        state = BatchState.STOP;
-        return;
-      }
-
-      if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-        state = BatchState.OUT_OF_MEMORY;
+      if (! prefetchFirstBatchFromBothSides()) {
         return;
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 9a72fcb..30efeec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,10 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.carrotsearch.hppc.IntHashSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.drill.common.expression.ConvertExpression;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -35,6 +35,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -51,24 +52,31 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
-import com.carrotsearch.hppc.IntHashSet;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -165,8 +173,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             // Only need to add the schema for the complex exprs because others should already have
             // been setup during setupNewSchema
             for (FieldReference fieldReference : complexFieldReferencesList) {
-              container.addOrGet(fieldReference.getRootSegment().getPath(),
-                  Types.required(MinorType.MAP), MapVector.class);
+              MaterializedField field = MaterializedField.create(fieldReference.getAsNamePart().getName(), UntypedNullHolder.TYPE);
+              container.add(new UntypedNullVector(field, container.getAllocator()));
             }
             container.buildSchema(SelectionVectorMode.NONE);
             wasNone = true;
@@ -302,8 +310,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
   }
 
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
         v.clear();
@@ -322,7 +329,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    cg.getCodeGenerator().saveCodeForDebugging(true);
+    //    cg.getCodeGenerator().saveCodeForDebugging(true);
 
     final IntHashSet transferFieldIds = new IntHashSet();
 
@@ -335,14 +342,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       result.clear();
 
       if (classify && namedExpression.getExpr() instanceof SchemaPath) {
-        classifyExpr(namedExpression, incoming, result);
+        classifyExpr(namedExpression, incomingBatch, result);
 
         if (result.isStar) {
           // The value indicates which wildcard we are processing now
           final Integer value = result.prefixMap.get(result.prefix);
           if (value != null && value == 1) {
             int k = 0;
-            for (final VectorWrapper<?> wrapper : incoming) {
+            for (final VectorWrapper<?> wrapper : incomingBatch) {
               final ValueVector vvIn = wrapper.getValueVector();
               if (k > result.outputNames.size() - 1) {
                 assert false;
@@ -363,7 +370,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             }
           } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
             int k = 0;
-            for (final VectorWrapper<?> wrapper : incoming) {
+            for (final VectorWrapper<?> wrapper : incomingBatch) {
               final ValueVector vvIn = wrapper.getValueVector();
               final SchemaPath originalPath = SchemaPath.getSimplePath(vvIn.getField().getName());
               if (k > result.outputNames.size() - 1) {
@@ -378,9 +385,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
                 continue;
               }
 
-              final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry());
+              final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incomingBatch, collector, context.getFunctionRegistry() );
               if (collector.hasErrors()) {
-                throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+                throw new SchemaChangeException(String.format("Failure while trying to materialize incomingBatch schema.  Errors:\n %s.", collector.toErrorString()));
               }
 
               final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
@@ -417,23 +424,23 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         }
       }
 
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming,
-              collector, context.getFunctionRegistry(), true, unionTypeEnabled);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incomingBatch,
+          collector, context.getFunctionRegistry(), true, unionTypeEnabled);
       final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
       if (collector.hasErrors()) {
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
       }
 
       // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
-      if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+      if (expr instanceof ValueVectorReadExpression && incomingBatch.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
           && !((ValueVectorReadExpression) expr).hasReadPath()
           && !isAnyWildcard
           && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
 
         final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
         final TypedFieldId id = vectorRead.getFieldId();
-        final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
-        Preconditions.checkNotNull(incoming);
+        final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+        Preconditions.checkNotNull(incomingBatch);
 
         final FieldReference ref = getRef(namedExpression);
         final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
@@ -473,7 +480,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
           if (!vectorRead.hasReadPath()) {
             final TypedFieldId id = vectorRead.getFieldId();
-            final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+            final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
             vvIn.makeTransferPair(vector);
           }
         }
@@ -485,12 +492,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
       codeGen.plainJavaCapable(true);
       // Uncomment out this line to debug the generated code.
-//      codeGen.saveCodeForDebugging(true);
+      //      codeGen.saveCodeForDebugging(true);
       this.projector = context.getImplementationClass(codeGen);
-      projector.setup(context, incoming, this, transfers);
+      projector.setup(context, incomingBatch, this, transfers);
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    setupNewSchemaFromInput(this.incoming);
     if (container.isSchemaChanged()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
@@ -624,11 +636,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     final int incomingSchemaSize = incoming.getSchema().getFieldCount();
 
-    // for debugging..
-    // if (incomingSchemaSize > 9) {
-    // assert false;
-    // }
-
     // input is '*' and output is 'prefix_*'
     if (exprIsStar && refHasPrefix && refEndsWithStar) {
       final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
@@ -768,4 +775,50 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       }
     }
   }
+
+  /**
+   * Handle Null input specially when Project operator is for query output. This happens when input return 0 batch
+   * (returns a FAST NONE directly).
+   *
+   * <p>
+   * Project operator has to return a batch with schema derived using the following 3 rules:
+   * </p>
+   * <ul>
+   *  <li>Case 1:  *  ==>  expand into an empty list of columns. </li>
+   *  <li>Case 2:  regular column reference ==> treat as nullable-int column </li>
+   *  <li>Case 3:  expressions => Call ExpressionTreeMaterialization over an empty vector contain.
+   *           Once the expression is materialized without error, use the output type of materialized
+   *           expression. </li>
+   * </ul>
+   *
+   * <p>
+   * The batch is constructed with the above rules, and recordCount = 0.
+   * Returned with OK_NEW_SCHEMA to down-stream operator.
+   * </p>
+   */
+  @Override
+  protected IterOutcome handleNullInput() {
+    if (! popConfig.isOutputProj()) {
+      return super.handleNullInput();
+    }
+
+    VectorContainer emptyVC = new VectorContainer();
+    emptyVC.buildSchema(SelectionVectorMode.NONE);
+    RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context);
+
+    try {
+      setupNewSchemaFromInput(emptyIncomingBatch);
+    } catch (SchemaChangeException e) {
+      kill(false);
+      logger.error("Failure during query", e);
+      context.fail(e);
+      return IterOutcome.STOP;
+    }
+
+    doAlloc(0);
+    container.buildSchema(SelectionVectorMode.NONE);
+    wasNone = true;
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
 }