You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ih...@apache.org on 2020/04/09 11:55:02 UTC
[drill] branch master updated: DRILL-7429: Wrong column order when
selecting complex data using Hive storage plugin
This is an automated email from the ASF dual-hosted git repository.
ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new eb08d04 DRILL-7429: Wrong column order when selecting complex data using Hive storage plugin
eb08d04 is described below
commit eb08d0404e91031c7cbcc20c39dfc07e12e3fb5a
Author: Igor Guzenko <ih...@gmail.com>
AuthorDate: Thu Mar 5 14:14:25 2020 +0200
DRILL-7429: Wrong column order when selecting complex data using Hive storage plugin
---
.../exec/hive/complex_types/TestHiveStructs.java | 19 ++++++++++
.../physical/impl/project/ProjectBatchBuilder.java | 13 +++++--
.../physical/impl/project/ProjectRecordBatch.java | 29 ++++++--------
.../physical/impl/union/UnionAllRecordBatch.java | 1 +
.../java/org/apache/drill/TestUntypedNull.java | 44 +++++++++++++---------
5 files changed, 66 insertions(+), 40 deletions(-)
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
index 3729e3a..0725038 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
@@ -22,10 +22,14 @@ import java.nio.file.Paths;
import org.apache.drill.categories.HiveStorageTest;
import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.hive.HiveClusterTest;
import org.apache.drill.exec.hive.HiveTestFixture;
import org.apache.drill.exec.hive.HiveTestUtilities;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.util.StoragePluginTestUtils;
import org.apache.drill.exec.util.Text;
@@ -234,6 +238,21 @@ public class TestHiveStructs extends HiveClusterTest {
.go();
}
+ @Test // DRILL-7429
+ public void testCorrectColumnOrdering() throws Exception {
+ BatchSchema expectedSchema = new BatchSchemaBuilder()
+ .withSchemaBuilder(new SchemaBuilder()
+ .addMap("a").resumeSchema()
+ .addNullable("b", TypeProtos.MinorType.INT))
+ .build();
+
+ String sql = "SELECT t.str_n0 a, rid b FROM hive.struct_tbl t LIMIT 1";
+ testBuilder()
+ .sqlQuery(sql)
+ .schemaBaseLine(expectedSchema)
+ .go();
+ }
+
@Test
public void primitiveStructWithOrdering() throws Exception {
testBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
index ce5cb72..efb40d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
@@ -17,9 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.project;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -32,10 +29,14 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.exec.vector.UntypedNullHolder;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Implements callbacks to build the physical vectors for the project
* record batch.
@@ -100,7 +101,11 @@ public class ProjectBatchBuilder implements ProjectionMaterializer.BatchBuilder
} else {
projectBatch.complexFieldReferencesList.clear();
}
-
+ // reserve place for complex field in container
+ ValueVector lateVv = container.addOrGet(
+ MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), UntypedNullHolder.TYPE),
+ callBack);
+ projectBatch.allocationVectors.add(lateVv);
// save the field reference for later for getting schema when input is empty
projectBatch.complexFieldReferencesList.add(ref);
projectBatch.memoryManager.addComplexField(null); // this will just add an estimate to the row width
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a4613a1..62bd0c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,11 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.project;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.ExecConstants;
@@ -29,21 +24,24 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.UntypedNullHolder;
-import org.apache.drill.exec.vector.UntypedNullVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
private static final Logger logger = LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -120,17 +118,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// since this is first batch and we already got a NONE, need to set up the schema
doAlloc(0);
setValueCount(0);
-
- // Only need to add the schema for the complex exprs because others should already have
- // been setup during setupNewSchema
- for (FieldReference fieldReference : complexFieldReferencesList) {
- MaterializedField field = MaterializedField.create(fieldReference.getAsNamePart().getName(),
- UntypedNullHolder.TYPE);
- container.add(new UntypedNullVector(field, container.getAllocator()));
- }
- container.buildSchema(SelectionVectorMode.NONE);
wasNone = true;
- return IterOutcome.OK_NEW_SCHEMA;
+ return IterOutcome.NONE;
} else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA && next != EMIT) {
return next;
} else if (next == IterOutcome.OK_NEW_SCHEMA) {
@@ -323,6 +312,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
@Override
protected IterOutcome handleNullInput() {
if (!popConfig.isOutputProj()) {
+ BatchSchema incomingSchema = incoming.getSchema();
+ if (incomingSchema != null && incomingSchema.getFieldCount() > 0) {
+ setupNewSchemaFromInput(incoming);
+ }
return super.handleNullInput();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ce68fa4..df2ffed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -198,6 +198,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
} else if (inField.getType().getMinorType() == TypeProtos.MinorType.NULL) {
+ index++;
continue;
} else { // Copy data in order to rename the column
SchemaPath inputPath = SchemaPath.getSimplePath(inField.getName());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
index edadcb2..a3d8532 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
@@ -32,9 +32,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.Arrays;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -78,24 +75,35 @@ public class TestUntypedNull extends ClusterTest {
}
@Test
- public void testTableCreation() throws Exception {
- String tablePrefix = "table_";
- List<String> formats = Arrays.asList("parquet", "json", "csv");
+ public void testParquetTableCreation() throws Exception {
+ testTableCreation("parquet");
+ }
+
+ @Test
+ public void testJsonTableCreation() throws Exception {
+ testTableCreation("json");
+ }
+
+ @Test
+ public void testCsvTableCreation() throws Exception {
+ testTableCreation("csv");
+ }
+
+
+ private void testTableCreation(String format) throws Exception {
+ String tableName = "table_" + format;
try {
- for (String format : formats) {
- client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, format);
- String query = String.format("create table dfs.tmp.%s%s as\n" +
- "select split(n_name, ' ') [1] from cp.`tpch/nation.parquet` where n_nationkey = -1 group by n_name",
- tablePrefix, format);
- QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
- assertTrue(summary.succeeded());
- assertEquals(1, summary.recordCount());
- }
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, format);
+ String query = "create table dfs.tmp." + tableName + " as\n" +
+ "select split(n_name, ' ') [1] from cp.`tpch/nation.parquet` where n_nationkey = -1 group by n_name";
+
+ QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
+
+ assertTrue(summary.succeeded());
+ assertEquals(1, summary.recordCount());
} finally {
client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
- for (String format : formats) {
- queryBuilder().sql(String.format("drop table if exists dfs.tmp.%s%s", tablePrefix, format)).run();
- }
+ queryBuilder().sql("drop table if exists dfs.tmp." + tableName).run();
}
}