You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/05/07 23:19:58 UTC

[1/2] drill git commit: DRILL-2408 (part 2): CTAS should not create empty folders when underlying query returns no results

Repository: drill
Updated Branches:
  refs/heads/master d12bee05a -> 8c706e6fa


DRILL-2408 (part 2): CTAS should not create empty folders when underlying query returns no results

- changed ParquetRecordWriter to avoid creating the parquet file until the first row of data is available
- Moved unit tests in a separate test class that starts 3 drillbits, to test the case where multiple fragments are attempting to write empty parquet files
- changed BaseQueryTest to update the storage plugin in all started bits and not just the first one


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/868ce4de
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/868ce4de
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/868ce4de

Branch: refs/heads/master
Commit: 868ce4de25a9ed2153de96a6504272be9d1f4a2a
Parents: d12bee0
Author: adeneche <ad...@gmail.com>
Authored: Mon Apr 20 13:03:07 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu May 7 13:51:17 2015 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetRecordWriter.java |  77 ++++--------
 .../apache/drill/exec/util/TestUtilities.java   |  21 ++--
 .../java/org/apache/drill/BaseTestQuery.java    |  10 +-
 .../physical/impl/writer/TestParquetWriter.java |  43 -------
 .../writer/TestParquetWriterEmptyFiles.java     | 118 +++++++++++++++++++
 .../apache/drill/jdbc/DrillConnectionImpl.java  |   3 +-
 6 files changed, 166 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 8615eb7..621f05c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
@@ -64,7 +63,7 @@ import parquet.schema.Type.Repetition;
 import com.google.common.collect.Lists;
 
 public class ParquetRecordWriter extends ParquetOutputRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
 
   private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -72,12 +71,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
-  private Map<String, String> extraMetaData = new HashMap();
+  private Map<String, String> extraMetaData = new HashMap<>();
   private int blockSize;
-  private int pageSize = 1 * 1024 * 1024;
+  private int pageSize = 1024 * 1024;
   private int dictionaryPageSize = pageSize;
   private boolean enableDictionary = false;
-  private boolean validating = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
   private DirectCodecFactory codecFactory;
@@ -96,7 +94,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private String prefix;
   private int index = 0;
   private OperatorContext oContext;
-  private ParquetDirectByteBufferAllocator allocator;
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
@@ -152,10 +149,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
     schema = new MessageType("root", types);
 
-    Path fileName = getPath();
-    parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
-    parquetFileWriter.start();
-
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
     pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
         codecFactory.getCompressor(codec, pageSize),
@@ -163,18 +156,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
         initialBlockBufferSize);
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
-    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
+    MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
   }
 
-  /**
-   * @return Path for the latest file created
-   */
-  private Path getPath() {
-    return new Path(location, prefix + "_" + index + ".parquet");
-  }
-
   private PrimitiveType getPrimitiveType(MaterializedField field) {
     MinorType minorType = field.getType().getMinorType();
     String name = field.getLastName();
@@ -204,12 +190,18 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   }
 
   private void flush() throws IOException {
-    parquetFileWriter.startBlock(recordCount);
-    store.flush();
-    ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-    recordCount = 0;
-    parquetFileWriter.endBlock();
-    parquetFileWriter.end(extraMetaData);
+    if (recordCount > 0) {
+      parquetFileWriter.startBlock(recordCount);
+      store.flush();
+      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
+      recordCount = 0;
+      parquetFileWriter.endBlock();
+
+      // we are writing one single block per file
+      parquetFileWriter.end(extraMetaData);
+      parquetFileWriter = null;
+    }
+
     store.close();
     ColumnChunkPageWriteStoreExposer.close(pageStore);
     store = null;
@@ -307,7 +299,16 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   @Override
   public void endRecord() throws IOException {
     consumer.endMessage();
+
+    // we wait until there is at least one record before creating the parquet file
+    if (parquetFileWriter == null) {
+      Path path = new Path(location, prefix + "_" + index + ".parquet");
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+      parquetFileWriter.start();
+    }
+
     recordCount++;
+
     checkBlockSizeReached();
   }
 
@@ -317,34 +318,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void cleanup() throws IOException {
-    boolean hasRecords = recordCount > 0;
-    if (hasRecords) {
-      parquetFileWriter.startBlock(recordCount);
-      store.flush();
-      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-      parquetFileWriter.end(extraMetaData);
-    }
-    if (store != null) {
-      store.close();
-    }
-    if (pageStore != null) {
-      ColumnChunkPageWriteStoreExposer.close(pageStore);
-    }
+    flush();
 
     codecFactory.close();
-
-    if (!hasRecords) {
-      // the very last file is empty, delete it (DRILL-2408)
-      Path path = getPath();
-      logger.debug("no record written, deleting parquet file {}", path);
-      FileSystem fs = path.getFileSystem(conf);
-      if (fs.exists(path)) {
-        if (!fs.delete(path, false)) {
-          throw new DrillRuntimeException("Couldn't delete empty file " + path);
-        }
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index a1fcc2a..cb687af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -44,29 +44,36 @@ public class TestUtilities {
   private static final String dfsTestTmpSchema = "tmp";
 
   /**
+   * Create and removes a temporary folder
+   *
+   * @return absolute path to temporary folder
+   */
+  public static String createTempDir() {
+    final File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    return tmpDir.getAbsolutePath();
+  }
+
+  /**
    * Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update the location with an
    * exclusive temp directory just for use in the current test jvm.
    *
    * @param pluginRegistry
    * @return JVM exclusive temporary directory location.
    */
-  public static String updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry)
+  public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry,
+                                                      final String tmpDirPath)
       throws ExecutionSetupException {
     final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName);
     final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
     final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema);
 
-    final File tmpDir = Files.createTempDir();
-    tmpDir.deleteOnExit();
-    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDir.getAbsolutePath(),
-        true, tmpWSConfig.getDefaultInputFormat());
+    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDirPath, true, tmpWSConfig.getDefaultInputFormat());
 
     pluginConfig.workspaces.remove(dfsTestTmpSchema);
     pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig);
 
     pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true);
-
-    return tmpDir.getAbsolutePath();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 200bbc6..f8ec090 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -166,15 +166,17 @@ public class BaseTestQuery extends ExecTest {
       serviceSet = RemoteServiceSet.getLocalServiceSet();
     }
 
+    dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+
     bits = new Drillbit[drillbitCount];
     for(int i = 0; i < drillbitCount; i++) {
       bits[i] = new Drillbit(config, serviceSet);
       bits[i].run();
-    }
 
-    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
-    dfsTestTmpSchemaLocation = TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
-    TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+      final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
+      TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+    }
 
     client = QueryTestUtil.createClient(config,  serviceSet, MAX_WIDTH_PER_NODE, null);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 5670e1e..4a41669 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.writer;
 
-import java.io.File;
 import java.math.BigDecimal;
 import java.sql.Date;
 
@@ -28,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -445,22 +443,6 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
-
-  @Test // see DRILL-2408
-  public void testWriteEmptyFile() throws Exception {
-    String outputFile = "testparquetwriter_test_write_empty_file";
-
-    try {
-      Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
-      //    test("ALTER SESSION SET `planner.add_producer_consumer` = false");
-      test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);
-
-      Assert.assertEquals(fs.listStatus(path).length, 0);
-    } finally {
-      deleteTableIfExists(outputFile);
-    }
-  }
-
   @Test // DRILL-2341
   public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
     final String newTblName = "testTableOutputSchema";
@@ -531,31 +513,6 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
-  @Test // see DRILL-2408
-  public void testWriteEmptyFileAfterFlush() throws Exception {
-    String outputFile = "testparquetwriter_test_write_empty_file_after_flush";
-
-    try {
-      // this specific value will force a flush just after the final row is written
-      // this will cause the creation of a new "empty" parquet file
-      test("ALTER SESSION SET `store.parquet.block-size` = 19926");
-
-      String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
-      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
-
-      // this query will fail if the "empty" file wasn't deleted
-      testBuilder()
-        .unOrdered()
-        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
-        .sqlBaselineQuery(query)
-        .go();
-    } finally {
-      // restore the session option
-      test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
-      deleteTableIfExists(outputFile);
-    }
-  }
-
   private static void deleteTableIfExists(String tableName) {
     try {
       Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
new file mode 100644
index 0000000..2848b68
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestParquetWriterEmptyFiles extends BaseTestQuery {
+
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void initFs() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+
+    fs = FileSystem.get(conf);
+
+    updateTestCluster(3, null);
+  }
+
+  @Test // see DRILL-2408
+  public void testWriteEmptyFile() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_testwriteemptyfile";
+
+    try {
+      test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);
+
+      final Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
+      Assert.assertFalse(fs.exists(path));
+    } finally {
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  @Test
+  public void testMultipleWriters() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_testmultiplewriters";
+
+    runSQL("alter session set `planner.slice_target` = 1");
+
+    try {
+      final String query = "SELECT position_id FROM cp.`employee.json` WHERE position_id IN (15, 16) GROUP BY position_id";
+      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+      // this query will fail if an "empty" file was created
+      testBuilder()
+        .unOrdered()
+        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      runSQL("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  @Test // see DRILL-2408
+  public void testWriteEmptyFileAfterFlush() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    deleteTableIfExists(outputFile);
+
+    try {
+      // this specific value will force a flush just after the final row is written
+      // this may cause the creation of a new "empty" parquet file
+      test("ALTER SESSION SET `store.parquet.block-size` = 19926");
+
+      final String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
+      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+      // this query will fail if an "empty" file was created
+      testBuilder()
+        .unOrdered()
+        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      // restore the session option
+      test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  private static boolean deleteTableIfExists(String tableName) {
+    try {
+      Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
+      if (fs.exists(path)) {
+        return fs.delete(path, true);
+      }
+    } catch (Exception e) {
+      // ignore exceptions.
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index c73eb50..74c6655 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -332,7 +332,8 @@ public abstract class DrillConnectionImpl extends AvaticaConnection
   private static void makeTmpSchemaLocationsUnique(StoragePluginRegistry pluginRegistry, Properties props) {
     try {
       if (props != null && "true".equalsIgnoreCase(props.getProperty("drillJDBCUnitTests"))) {
-        TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
+        final String tmpDirPath = TestUtilities.createTempDir();
+        TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tmpDirPath);
         TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
       }
     } catch(Throwable e) {


[2/2] drill git commit: DRILL-2376: In UnionAllRecordBactch, the mechansim to detect schema change is corrected

Posted by am...@apache.org.
DRILL-2376: In UnionAllRecordBactch, the mechansim to detect schema change is corrected


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8c706e6f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8c706e6f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8c706e6f

Branch: refs/heads/master
Commit: 8c706e6fa44e100cef6c117ddeceb238d150e89d
Parents: 868ce4d
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Thu Apr 23 13:44:13 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu May 7 14:03:41 2015 -0700

----------------------------------------------------------------------
 .../impl/union/UnionAllRecordBatch.java         |  4 +--
 .../java/org/apache/drill/TestUnionAll.java     | 32 ++++++++++++++++++--
 .../testframework/testUnionAllQueries/q18.tsv   | 15 ---------
 .../testframework/testUnionAllQueries/q18_1.tsv | 15 +++++++++
 .../testframework/testUnionAllQueries/q18_2.tsv | 13 ++++++++
 .../testframework/testUnionAllQueries/q18_3.tsv | 13 ++++++++
 6 files changed, 72 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index d7ea3bb..66bc3e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -400,9 +400,9 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
                 throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
               }
 
-              upstream = IterOutcome.OK;
+              iterOutcome = IterOutcome.OK;
               // fall through
-              case OK:
+            case OK:
               unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
               upstream = iterOutcome;
               return upstream;

http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 72f52e9..5f98d90 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -375,15 +375,41 @@ public class TestUnionAll extends BaseTestQuery{
     String rootDate = FileUtils.getResourceAsFile("/store/json/dateData.json").toURI().toString();
     String rootTimpStmp = FileUtils.getResourceAsFile("/store/json/timeStmpData.json").toURI().toString();
 
-    String query = String.format(
+    String query1 = String.format(
         "(select max(key) as key from dfs_test.`%s` " +
         "union all " +
         "select key from dfs_test.`%s`)", rootDate, rootTimpStmp);
 
+    String query2 = String.format(
+        "select key from dfs_test.`%s` " +
+        "union all " +
+        "select max(key) as key from dfs_test.`%s`", rootDate, rootTimpStmp);
+
+    String query3 = String.format(
+        "select key from dfs_test.`%s` " +
+        "union all " +
+        "select max(key) as key from dfs_test.`%s`", rootDate, rootTimpStmp);
+
     testBuilder()
-        .sqlQuery(query)
+        .sqlQuery(query1)
+        .unOrdered()
+        .csvBaselineFile("testframework/testUnionAllQueries/q18_1.tsv")
+        .baselineTypes(TypeProtos.MinorType.VARCHAR)
+        .baselineColumns("key")
+        .build().run();
+
+    testBuilder()
+        .sqlQuery(query2)
+        .unOrdered()
+        .csvBaselineFile("testframework/testUnionAllQueries/q18_2.tsv")
+        .baselineTypes(TypeProtos.MinorType.VARCHAR)
+        .baselineColumns("key")
+        .build().run();
+
+    testBuilder()
+        .sqlQuery(query3)
         .unOrdered()
-        .csvBaselineFile("testframework/testUnionAllQueries/q18.tsv")
+        .csvBaselineFile("testframework/testUnionAllQueries/q18_3.tsv")
         .baselineTypes(TypeProtos.MinorType.VARCHAR)
         .baselineColumns("key")
         .build().run();

http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
deleted file mode 100644
index ccf0d35..0000000
--- a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
+++ /dev/null
@@ -1,15 +0,0 @@
-2011-07-26
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
new file mode 100644
index 0000000..ccf0d35
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
@@ -0,0 +1,15 @@
+2011-07-26
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
new file mode 100644
index 0000000..123efc6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
@@ -0,0 +1,13 @@
+2009-03-03
+2001-08-27
+2011-07-26
+1970-09-02
+1983-04-24
+2007-02-01
+1977-08-03
+1962-05-14
+1950-02-16
+1983-09-05
+2000-09-09
+1960-08-18
+2015-03-26 19:04:55.544
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
new file mode 100644
index 0000000..123efc6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
@@ -0,0 +1,13 @@
+2009-03-03
+2001-08-27
+2011-07-26
+1970-09-02
+1983-04-24
+2007-02-01
+1977-08-03
+1962-05-14
+1950-02-16
+1983-09-05
+2000-09-09
+1960-08-18
+2015-03-26 19:04:55.544
\ No newline at end of file