You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ih...@apache.org on 2020/04/09 11:55:02 UTC

[drill] branch master updated: DRILL-7429: Wrong column order when selecting complex data using Hive storage plugin

This is an automated email from the ASF dual-hosted git repository.

ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new eb08d04  DRILL-7429: Wrong column order when selecting complex data using Hive storage plugin
eb08d04 is described below

commit eb08d0404e91031c7cbcc20c39dfc07e12e3fb5a
Author: Igor Guzenko <ih...@gmail.com>
AuthorDate: Thu Mar 5 14:14:25 2020 +0200

    DRILL-7429: Wrong column order when selecting complex data using Hive storage plugin
---
 .../exec/hive/complex_types/TestHiveStructs.java   | 19 ++++++++++
 .../physical/impl/project/ProjectBatchBuilder.java | 13 +++++--
 .../physical/impl/project/ProjectRecordBatch.java  | 29 ++++++--------
 .../physical/impl/union/UnionAllRecordBatch.java   |  1 +
 .../java/org/apache/drill/TestUntypedNull.java     | 44 +++++++++++++---------
 5 files changed, 66 insertions(+), 40 deletions(-)

diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
index 3729e3a..0725038 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
@@ -22,10 +22,14 @@ import java.nio.file.Paths;
 
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.hive.HiveClusterTest;
 import org.apache.drill.exec.hive.HiveTestFixture;
 import org.apache.drill.exec.hive.HiveTestUtilities;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.exec.util.Text;
@@ -234,6 +238,21 @@ public class TestHiveStructs extends HiveClusterTest {
         .go();
   }
 
+  @Test // DRILL-7429
+  public void testCorrectColumnOrdering() throws Exception {
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+        .withSchemaBuilder(new SchemaBuilder()
+            .addMap("a").resumeSchema()
+            .addNullable("b", TypeProtos.MinorType.INT))
+        .build();
+
+    String sql = "SELECT t.str_n0 a, rid b FROM hive.struct_tbl t LIMIT 1";
+    testBuilder()
+        .sqlQuery(sql)
+        .schemaBaseLine(expectedSchema)
+        .go();
+  }
+
   @Test
   public void primitiveStructWithOrdering() throws Exception {
     testBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
index ce5cb72..efb40d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -32,10 +29,14 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.exec.vector.UntypedNullHolder;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Implements callbacks to build the physical vectors for the project
  * record batch.
@@ -100,7 +101,11 @@ public class ProjectBatchBuilder implements ProjectionMaterializer.BatchBuilder
     } else {
       projectBatch.complexFieldReferencesList.clear();
     }
-
+    // reserve place for complex field in container
+    ValueVector lateVv = container.addOrGet(
+        MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), UntypedNullHolder.TYPE),
+        callBack);
+    projectBatch.allocationVectors.add(lateVv);
     // save the field reference for later for getting schema when input is empty
     projectBatch.complexFieldReferencesList.add(ref);
     projectBatch.memoryManager.addComplexField(null); // this will just add an estimate to the row width
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a4613a1..62bd0c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,11 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.ExecConstants;
@@ -29,21 +24,24 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.UntypedNullHolder;
-import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private static final Logger logger = LoggerFactory.getLogger(ProjectRecordBatch.class);
 
@@ -120,17 +118,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             // since this is first batch and we already got a NONE, need to set up the schema
             doAlloc(0);
             setValueCount(0);
-
-            // Only need to add the schema for the complex exprs because others should already have
-            // been setup during setupNewSchema
-            for (FieldReference fieldReference : complexFieldReferencesList) {
-              MaterializedField field = MaterializedField.create(fieldReference.getAsNamePart().getName(),
-                      UntypedNullHolder.TYPE);
-              container.add(new UntypedNullVector(field, container.getAllocator()));
-            }
-            container.buildSchema(SelectionVectorMode.NONE);
             wasNone = true;
-            return IterOutcome.OK_NEW_SCHEMA;
+            return IterOutcome.NONE;
           } else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA && next != EMIT) {
             return next;
           } else if (next == IterOutcome.OK_NEW_SCHEMA) {
@@ -323,6 +312,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   @Override
   protected IterOutcome handleNullInput() {
     if (!popConfig.isOutputProj()) {
+      BatchSchema incomingSchema = incoming.getSchema();
+      if (incomingSchema != null && incomingSchema.getFieldCount() > 0) {
+        setupNewSchemaFromInput(incoming);
+      }
       return super.handleNullInput();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ce68fa4..df2ffed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -198,6 +198,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
         TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
       } else if (inField.getType().getMinorType() == TypeProtos.MinorType.NULL) {
+        index++;
         continue;
       } else { // Copy data in order to rename the column
         SchemaPath inputPath = SchemaPath.getSimplePath(inField.getName());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
index edadcb2..a3d8532 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
@@ -32,9 +32,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.Arrays;
-import java.util.List;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -78,24 +75,35 @@ public class TestUntypedNull extends ClusterTest {
   }
 
   @Test
-  public void testTableCreation() throws Exception {
-    String tablePrefix = "table_";
-    List<String> formats = Arrays.asList("parquet", "json", "csv");
+  public void testParquetTableCreation() throws Exception {
+    testTableCreation("parquet");
+  }
+
+  @Test
+  public void testJsonTableCreation() throws Exception {
+    testTableCreation("json");
+  }
+
+  @Test
+  public void testCsvTableCreation() throws Exception {
+    testTableCreation("csv");
+  }
+
+
+  private void testTableCreation(String format) throws Exception {
+    String tableName = "table_" + format;
     try {
-      for (String format : formats) {
-        client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, format);
-        String query = String.format("create table dfs.tmp.%s%s as\n" +
-          "select split(n_name, ' ') [1] from cp.`tpch/nation.parquet` where n_nationkey = -1 group by n_name",
-          tablePrefix, format);
-        QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
-        assertTrue(summary.succeeded());
-        assertEquals(1, summary.recordCount());
-      }
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, format);
+      String query = "create table dfs.tmp." + tableName + " as\n" +
+          "select split(n_name, ' ') [1] from cp.`tpch/nation.parquet` where n_nationkey = -1 group by n_name";
+
+      QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
+
+      assertTrue(summary.succeeded());
+      assertEquals(1, summary.recordCount());
     } finally {
       client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
-      for (String format : formats) {
-        queryBuilder().sql(String.format("drop table if exists dfs.tmp.%s%s", tablePrefix, format)).run();
-      }
+      queryBuilder().sql("drop table if exists dfs.tmp." + tableName).run();
     }
   }