You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/05/07 23:19:58 UTC
[1/2] drill git commit: DRILL-2408 (part 2): CTAS should not create
empty folders when underlying query returns no results
Repository: drill
Updated Branches:
refs/heads/master d12bee05a -> 8c706e6fa
DRILL-2408 (part 2): CTAS should not create empty folders when underlying query returns no results
- changed ParquetRecordWriter to avoid creating the parquet file until the first row of data is available
- Moved unit tests in a separate test class that starts 3 drillbits, to test the case where multiple fragments are attempting to write empty parquet files
- changed BaseQueryTest to update the storage plugin in all started bits and not just the first one
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/868ce4de
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/868ce4de
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/868ce4de
Branch: refs/heads/master
Commit: 868ce4de25a9ed2153de96a6504272be9d1f4a2a
Parents: d12bee0
Author: adeneche <ad...@gmail.com>
Authored: Mon Apr 20 13:03:07 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu May 7 13:51:17 2015 -0700
----------------------------------------------------------------------
.../exec/store/parquet/ParquetRecordWriter.java | 77 ++++--------
.../apache/drill/exec/util/TestUtilities.java | 21 ++--
.../java/org/apache/drill/BaseTestQuery.java | 10 +-
.../physical/impl/writer/TestParquetWriter.java | 43 -------
.../writer/TestParquetWriterEmptyFiles.java | 118 +++++++++++++++++++
.../apache/drill/jdbc/DrillConnectionImpl.java | 3 +-
6 files changed, 166 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 8615eb7..621f05c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
@@ -64,7 +63,7 @@ import parquet.schema.Type.Repetition;
import com.google.common.collect.Lists;
public class ParquetRecordWriter extends ParquetOutputRecordWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -72,12 +71,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
- private Map<String, String> extraMetaData = new HashMap();
+ private Map<String, String> extraMetaData = new HashMap<>();
private int blockSize;
- private int pageSize = 1 * 1024 * 1024;
+ private int pageSize = 1024 * 1024;
private int dictionaryPageSize = pageSize;
private boolean enableDictionary = false;
- private boolean validating = false;
private CompressionCodecName codec = CompressionCodecName.SNAPPY;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
private DirectCodecFactory codecFactory;
@@ -96,7 +94,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private String prefix;
private int index = 0;
private OperatorContext oContext;
- private ParquetDirectByteBufferAllocator allocator;
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super();
@@ -152,10 +149,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
schema = new MessageType("root", types);
- Path fileName = getPath();
- parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
- parquetFileWriter.start();
-
int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
codecFactory.getCompressor(codec, pageSize),
@@ -163,18 +156,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
initialBlockBufferSize);
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
- MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
+ MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
}
- /**
- * @return Path for the latest file created
- */
- private Path getPath() {
- return new Path(location, prefix + "_" + index + ".parquet");
- }
-
private PrimitiveType getPrimitiveType(MaterializedField field) {
MinorType minorType = field.getType().getMinorType();
String name = field.getLastName();
@@ -204,12 +190,18 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
private void flush() throws IOException {
- parquetFileWriter.startBlock(recordCount);
- store.flush();
- ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
- recordCount = 0;
- parquetFileWriter.endBlock();
- parquetFileWriter.end(extraMetaData);
+ if (recordCount > 0) {
+ parquetFileWriter.startBlock(recordCount);
+ store.flush();
+ ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
+ recordCount = 0;
+ parquetFileWriter.endBlock();
+
+ // we are writing one single block per file
+ parquetFileWriter.end(extraMetaData);
+ parquetFileWriter = null;
+ }
+
store.close();
ColumnChunkPageWriteStoreExposer.close(pageStore);
store = null;
@@ -307,7 +299,16 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void endRecord() throws IOException {
consumer.endMessage();
+
+ // we wait until there is at least one record before creating the parquet file
+ if (parquetFileWriter == null) {
+ Path path = new Path(location, prefix + "_" + index + ".parquet");
+ parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+ parquetFileWriter.start();
+ }
+
recordCount++;
+
checkBlockSizeReached();
}
@@ -317,34 +318,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void cleanup() throws IOException {
- boolean hasRecords = recordCount > 0;
- if (hasRecords) {
- parquetFileWriter.startBlock(recordCount);
- store.flush();
- ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
- recordCount = 0;
- parquetFileWriter.endBlock();
- parquetFileWriter.end(extraMetaData);
- }
- if (store != null) {
- store.close();
- }
- if (pageStore != null) {
- ColumnChunkPageWriteStoreExposer.close(pageStore);
- }
+ flush();
codecFactory.close();
-
- if (!hasRecords) {
- // the very last file is empty, delete it (DRILL-2408)
- Path path = getPath();
- logger.debug("no record written, deleting parquet file {}", path);
- FileSystem fs = path.getFileSystem(conf);
- if (fs.exists(path)) {
- if (!fs.delete(path, false)) {
- throw new DrillRuntimeException("Couldn't delete empty file " + path);
- }
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index a1fcc2a..cb687af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -44,29 +44,36 @@ public class TestUtilities {
private static final String dfsTestTmpSchema = "tmp";
/**
+ * Create and removes a temporary folder
+ *
+ * @return absolute path to temporary folder
+ */
+ public static String createTempDir() {
+ final File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ return tmpDir.getAbsolutePath();
+ }
+
+ /**
* Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update the location with an
* exclusive temp directory just for use in the current test jvm.
*
* @param pluginRegistry
* @return JVM exclusive temporary directory location.
*/
- public static String updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry)
+ public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry,
+ final String tmpDirPath)
throws ExecutionSetupException {
final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName);
final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema);
- final File tmpDir = Files.createTempDir();
- tmpDir.deleteOnExit();
- final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDir.getAbsolutePath(),
- true, tmpWSConfig.getDefaultInputFormat());
+ final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDirPath, true, tmpWSConfig.getDefaultInputFormat());
pluginConfig.workspaces.remove(dfsTestTmpSchema);
pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig);
pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true);
-
- return tmpDir.getAbsolutePath();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 200bbc6..f8ec090 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -166,15 +166,17 @@ public class BaseTestQuery extends ExecTest {
serviceSet = RemoteServiceSet.getLocalServiceSet();
}
+ dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+
bits = new Drillbit[drillbitCount];
for(int i = 0; i < drillbitCount; i++) {
bits[i] = new Drillbit(config, serviceSet);
bits[i].run();
- }
- final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
- dfsTestTmpSchemaLocation = TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
- TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+ final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
+ TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+ }
client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, null);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 5670e1e..4a41669 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.writer;
-import java.io.File;
import java.math.BigDecimal;
import java.sql.Date;
@@ -28,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
@@ -445,22 +443,6 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
-
- @Test // see DRILL-2408
- public void testWriteEmptyFile() throws Exception {
- String outputFile = "testparquetwriter_test_write_empty_file";
-
- try {
- Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
- // test("ALTER SESSION SET `planner.add_producer_consumer` = false");
- test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);
-
- Assert.assertEquals(fs.listStatus(path).length, 0);
- } finally {
- deleteTableIfExists(outputFile);
- }
- }
-
@Test // DRILL-2341
public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
final String newTblName = "testTableOutputSchema";
@@ -531,31 +513,6 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
- @Test // see DRILL-2408
- public void testWriteEmptyFileAfterFlush() throws Exception {
- String outputFile = "testparquetwriter_test_write_empty_file_after_flush";
-
- try {
- // this specific value will force a flush just after the final row is written
- // this will cause the creation of a new "empty" parquet file
- test("ALTER SESSION SET `store.parquet.block-size` = 19926");
-
- String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
- test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
-
- // this query will fail if the "empty" file wasn't deleted
- testBuilder()
- .unOrdered()
- .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
- .sqlBaselineQuery(query)
- .go();
- } finally {
- // restore the session option
- test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
- deleteTableIfExists(outputFile);
- }
- }
-
private static void deleteTableIfExists(String tableName) {
try {
Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
new file mode 100644
index 0000000..2848b68
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestParquetWriterEmptyFiles extends BaseTestQuery {
+
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void initFs() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+
+ fs = FileSystem.get(conf);
+
+ updateTestCluster(3, null);
+ }
+
+ @Test // see DRILL-2408
+ public void testWriteEmptyFile() throws Exception {
+ final String outputFile = "testparquetwriteremptyfiles_testwriteemptyfile";
+
+ try {
+ test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);
+
+ final Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
+ Assert.assertFalse(fs.exists(path));
+ } finally {
+ deleteTableIfExists(outputFile);
+ }
+ }
+
+ @Test
+ public void testMultipleWriters() throws Exception {
+ final String outputFile = "testparquetwriteremptyfiles_testmultiplewriters";
+
+ runSQL("alter session set `planner.slice_target` = 1");
+
+ try {
+ final String query = "SELECT position_id FROM cp.`employee.json` WHERE position_id IN (15, 16) GROUP BY position_id";
+ test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+ // this query will fail if an "empty" file was created
+ testBuilder()
+ .unOrdered()
+ .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+ .sqlBaselineQuery(query)
+ .go();
+ } finally {
+ runSQL("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ deleteTableIfExists(outputFile);
+ }
+ }
+
+ @Test // see DRILL-2408
+ public void testWriteEmptyFileAfterFlush() throws Exception {
+ final String outputFile = "testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+ deleteTableIfExists(outputFile);
+
+ try {
+ // this specific value will force a flush just after the final row is written
+ // this may cause the creation of a new "empty" parquet file
+ test("ALTER SESSION SET `store.parquet.block-size` = 19926");
+
+ final String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
+ test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+ // this query will fail if an "empty" file was created
+ testBuilder()
+ .unOrdered()
+ .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+ .sqlBaselineQuery(query)
+ .go();
+ } finally {
+ // restore the session option
+ test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
+ deleteTableIfExists(outputFile);
+ }
+ }
+
+ private static boolean deleteTableIfExists(String tableName) {
+ try {
+ Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
+ if (fs.exists(path)) {
+ return fs.delete(path, true);
+ }
+ } catch (Exception e) {
+ // ignore exceptions.
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index c73eb50..74c6655 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -332,7 +332,8 @@ public abstract class DrillConnectionImpl extends AvaticaConnection
private static void makeTmpSchemaLocationsUnique(StoragePluginRegistry pluginRegistry, Properties props) {
try {
if (props != null && "true".equalsIgnoreCase(props.getProperty("drillJDBCUnitTests"))) {
- TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
+ final String tmpDirPath = TestUtilities.createTempDir();
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tmpDirPath);
TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
}
} catch(Throwable e) {
[2/2] drill git commit: DRILL-2376: In UnionAllRecordBactch,
the mechansim to detect schema change is corrected
Posted by am...@apache.org.
DRILL-2376: In UnionAllRecordBactch, the mechansim to detect schema change is corrected
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8c706e6f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8c706e6f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8c706e6f
Branch: refs/heads/master
Commit: 8c706e6fa44e100cef6c117ddeceb238d150e89d
Parents: 868ce4d
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Thu Apr 23 13:44:13 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu May 7 14:03:41 2015 -0700
----------------------------------------------------------------------
.../impl/union/UnionAllRecordBatch.java | 4 +--
.../java/org/apache/drill/TestUnionAll.java | 32 ++++++++++++++++++--
.../testframework/testUnionAllQueries/q18.tsv | 15 ---------
.../testframework/testUnionAllQueries/q18_1.tsv | 15 +++++++++
.../testframework/testUnionAllQueries/q18_2.tsv | 13 ++++++++
.../testframework/testUnionAllQueries/q18_3.tsv | 13 ++++++++
6 files changed, 72 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index d7ea3bb..66bc3e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -400,9 +400,9 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
}
- upstream = IterOutcome.OK;
+ iterOutcome = IterOutcome.OK;
// fall through
- case OK:
+ case OK:
unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
upstream = iterOutcome;
return upstream;
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 72f52e9..5f98d90 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -375,15 +375,41 @@ public class TestUnionAll extends BaseTestQuery{
String rootDate = FileUtils.getResourceAsFile("/store/json/dateData.json").toURI().toString();
String rootTimpStmp = FileUtils.getResourceAsFile("/store/json/timeStmpData.json").toURI().toString();
- String query = String.format(
+ String query1 = String.format(
"(select max(key) as key from dfs_test.`%s` " +
"union all " +
"select key from dfs_test.`%s`)", rootDate, rootTimpStmp);
+ String query2 = String.format(
+ "select key from dfs_test.`%s` " +
+ "union all " +
+ "select max(key) as key from dfs_test.`%s`", rootDate, rootTimpStmp);
+
+ String query3 = String.format(
+ "select key from dfs_test.`%s` " +
+ "union all " +
+ "select max(key) as key from dfs_test.`%s`", rootDate, rootTimpStmp);
+
testBuilder()
- .sqlQuery(query)
+ .sqlQuery(query1)
+ .unOrdered()
+ .csvBaselineFile("testframework/testUnionAllQueries/q18_1.tsv")
+ .baselineTypes(TypeProtos.MinorType.VARCHAR)
+ .baselineColumns("key")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query2)
+ .unOrdered()
+ .csvBaselineFile("testframework/testUnionAllQueries/q18_2.tsv")
+ .baselineTypes(TypeProtos.MinorType.VARCHAR)
+ .baselineColumns("key")
+ .build().run();
+
+ testBuilder()
+ .sqlQuery(query3)
.unOrdered()
- .csvBaselineFile("testframework/testUnionAllQueries/q18.tsv")
+ .csvBaselineFile("testframework/testUnionAllQueries/q18_3.tsv")
.baselineTypes(TypeProtos.MinorType.VARCHAR)
.baselineColumns("key")
.build().run();
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
deleted file mode 100644
index ccf0d35..0000000
--- a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv
+++ /dev/null
@@ -1,15 +0,0 @@
-2011-07-26
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.542
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.543
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
-2015-03-26 19:04:55.544
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
new file mode 100644
index 0000000..ccf0d35
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_1.tsv
@@ -0,0 +1,15 @@
+2011-07-26
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.542
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.543
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
+2015-03-26 19:04:55.544
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
new file mode 100644
index 0000000..123efc6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_2.tsv
@@ -0,0 +1,13 @@
+2009-03-03
+2001-08-27
+2011-07-26
+1970-09-02
+1983-04-24
+2007-02-01
+1977-08-03
+1962-05-14
+1950-02-16
+1983-09-05
+2000-09-09
+1960-08-18
+2015-03-26 19:04:55.544
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/8c706e6f/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
new file mode 100644
index 0000000..123efc6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18_3.tsv
@@ -0,0 +1,13 @@
+2009-03-03
+2001-08-27
+2011-07-26
+1970-09-02
+1983-04-24
+2007-02-01
+1977-08-03
+1962-05-14
+1950-02-16
+1983-09-05
+2000-09-09
+1960-08-18
+2015-03-26 19:04:55.544
\ No newline at end of file