You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/09/17 13:34:05 UTC

[drill] branch master updated: DRILL-7376: Drill ignores Hive schema for MaprDB tables when group scan has star column

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b1d8cba  DRILL-7376: Drill ignores Hive schema for MaprDB tables when group scan has star column
b1d8cba is described below

commit b1d8cba2c33f2c66fad20c309a3e86c644b6bb1b
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Sep 11 20:51:07 2019 +0300

    DRILL-7376: Drill ignores Hive schema for MaprDB tables when group scan has star column
---
 .../store/mapr/db/json/MaprDBJsonRecordReader.java |   6 +-
 ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java |   5 +-
 .../exec/vector/complex/fn/JsonReaderUtils.java    | 187 ++++++++++++++++-----
 .../src/main/codegen/templates/ComplexCopier.java  |  14 +-
 4 files changed, 167 insertions(+), 45 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 5e80e62..9819ba5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -448,7 +448,11 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     }
 
     if (nonExistentColumnsProjection && recordCount > 0) {
-      JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), schema, allTextMode, Collections.emptyList());
+      if (schema == null || schema.isEmpty()) {
+        JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.emptyList());
+      } else {
+        JsonReaderUtils.writeColumnsUsingSchema(vectorWriter, getColumns(), schema, allTextMode);
+      }
     }
     vectorWriter.setValueCount(recordCount);
     if (maxRecordsToRead > 0) {
diff --git a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index f263e31..a67ba40 100644
--- a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -135,7 +135,10 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
     HiveToRelDataTypeConverter dataTypeConverter = new HiveToRelDataTypeConverter(typeFactory);
     TupleMetadata schema = new TupleSchema();
     hiveReadEntry.getTable().getColumnListsCache().getTableSchemaColumns()
-        .forEach(column -> schema.addColumn(HiveUtilities.getColumnMetadata(dataTypeConverter, column)));
+        .forEach(column ->
+            schema.addColumn(HiveUtilities.getColumnMetadata(
+                replaceOverriddenColumnId(parameters, column.getName()),
+                dataTypeConverter.convertToNullableRelDataType(column))));
 
     MapRDBFormatPluginConfig formatConfig = new MapRDBFormatPluginConfig();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
index 0485e39..3255df6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector.complex.fn;
 
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
@@ -38,53 +36,25 @@ public class JsonReaderUtils {
                                     boolean allTextMode,
                                     List<BaseWriter.ListWriter> emptyArrayWriters) {
 
-    ensureAtLeastOneField(writer, columns, null /* schema */, allTextMode, emptyArrayWriters);
-  }
-
-  public static void ensureAtLeastOneField(BaseWriter.ComplexWriter writer,
-                                           Collection<SchemaPath> columns,
-                                           TupleMetadata schema,
-                                           boolean allTextMode,
-                                           List<BaseWriter.ListWriter> emptyArrayWriters) {
-
     List<BaseWriter.MapWriter> writerList = new ArrayList<>();
     List<PathSegment> fieldPathList = new ArrayList<>();
-    List<TypeProtos.MajorType> types = new ArrayList<>();
-    BitSet emptyStatus = new BitSet(columns.size());
+    BitSet emptyWriters = new BitSet(columns.size());
     int fieldIndex = 0;
 
     // first pass: collect which fields are empty
     for (SchemaPath schemaPath : columns) {
       PathSegment fieldPath = schemaPath.getRootSegment();
       BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
-      TupleMetadata columnMetadata = schema;
       while (fieldPath.getChild() != null && !fieldPath.getChild().isArray()) {
-        String name = fieldPath.getNameSegment().getPath();
-        if (columnMetadata != null) {
-          ColumnMetadata metadata = columnMetadata.metadata(name);
-          columnMetadata = metadata != null ? metadata.mapSchema() : null;
-        }
-        fieldWriter = fieldWriter.map(name);
+        fieldWriter = fieldWriter.map(fieldPath.getNameSegment().getPath());
         fieldPath = fieldPath.getChild();
       }
       writerList.add(fieldWriter);
       fieldPathList.add(fieldPath);
-      // for the case when field is absent in the schema, use VARCHAR type
-      // if allTextMode is enabled or INT type if it is disabled
-      TypeProtos.MajorType majorType = allTextMode
-          ? Types.optional(TypeProtos.MinorType.VARCHAR)
-          : Types.optional(TypeProtos.MinorType.INT);
-      ColumnMetadata metadata = null;
-      if (columnMetadata != null) {
-        metadata = columnMetadata.metadata(fieldPath.getNameSegment().getPath());
-        majorType = metadata != null ? metadata.majorType() : majorType;
+      if (fieldWriter.isEmptyMap()) {
+        emptyWriters.set(fieldIndex, true);
       }
-      types.add(majorType);
-      // for the case if metadata is specified, ensures that required fields are created
-      if (fieldWriter.isEmptyMap() || metadata != null) {
-        emptyStatus.set(fieldIndex, true);
-      }
-      if (fieldIndex == 0 && !allTextMode && schema == null) {
+      if (fieldIndex == 0 && !allTextMode) {
         // when allTextMode is false, there is not much benefit to producing all
         // the empty fields; just produce 1 field. The reason is that the type of the
         // fields is unknown, so if we produce multiple Integer fields by default, a
@@ -100,12 +70,16 @@ public class JsonReaderUtils {
     // second pass: create default typed vectors corresponding to empty fields
     // Note: this is not easily do-able in 1 pass because the same fieldWriter
     // may be shared by multiple fields whereas we want to keep track of all fields
-    // independently, so we rely on the emptyStatus.
+    // independently, so we rely on the emptyWriters.
     for (int j = 0; j < fieldPathList.size(); j++) {
       BaseWriter.MapWriter fieldWriter = writerList.get(j);
       PathSegment fieldPath = fieldPathList.get(j);
-      if (emptyStatus.get(j)) {
-        ComplexCopier.getMapWriterForType(types.get(j), fieldWriter, fieldPath.getNameSegment().getPath());
+      if (emptyWriters.get(j)) {
+        if (allTextMode) {
+          fieldWriter.varChar(fieldPath.getNameSegment().getPath());
+        } else {
+          fieldWriter.integer(fieldPath.getNameSegment().getPath());
+        }
       }
     }
 
@@ -120,4 +94,141 @@ public class JsonReaderUtils {
       }
     }
   }
+
+  /**
+   * Creates writers which correspond to the specified schema for specified root writer.
+   *
+   * @param writer      parent writer for writers to create
+   * @param columns     collection of columns for which writers should be created
+   * @param schema      table schema
+   * @param allTextMode whether all primitive writers should be of varchar type
+   */
+  public static void writeColumnsUsingSchema(BaseWriter.ComplexWriter writer,
+      Collection<SchemaPath> columns, TupleMetadata schema, boolean allTextMode) {
+    BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+    for (SchemaPath column : columns) {
+      if (column.isDynamicStar()) {
+        writeSchemaColumns(schema, mapWriter, allTextMode);
+      } else {
+        ColumnMetadata columnMetadata = schema.metadata(column.getRootSegmentPath());
+        writeColumnToMapWriter(mapWriter, column.getRootSegment(), columnMetadata, allTextMode);
+      }
+    }
+  }
+
+  /**
+   * Creates writer for column which corresponds to specified {@code PathSegment column} with type taken from
+   * the specified {@code ColumnMetadata columnMetadata}.
+   * For the case when specified {@code PathSegment column} is map, writers only for its child segments will be created.
+   * For the case when specified {@code PathSegment column} is array segment, all child writers will be created.
+   *
+   * @param writer         parent writer for writers to create
+   * @param column         column for which writers should be created
+   * @param columnMetadata column metadata
+   * @param allTextMode    whether all primitive writers should be of varchar type
+   */
+  private static void writeColumnToMapWriter(BaseWriter.MapWriter writer,
+      PathSegment column, ColumnMetadata columnMetadata, boolean allTextMode) {
+    PathSegment child = column.getChild();
+    if (child != null && child.isNamed()) {
+      String name = column.getNameSegment().getPath();
+      ColumnMetadata childMetadata = columnMetadata.mapSchema().metadata(name);
+      writeColumnToMapWriter(writer.map(name), child, childMetadata, allTextMode);
+    } else {
+      writeSingleOrArrayColumn(columnMetadata, writer, allTextMode);
+    }
+  }
+
+  /**
+   * Creates writers for specified {@code ColumnMetadata columnMetadata}.
+   * For the case when column is array, creates list writer and required child writers.
+   *
+   * @param columnMetadata column metadata
+   * @param writer         parent writer for writers to create
+   * @param allTextMode    whether all primitive writers should be of varchar type
+   */
+  private static void writeSingleOrArrayColumn(ColumnMetadata columnMetadata,
+      BaseWriter.MapWriter writer, boolean allTextMode) {
+    if (columnMetadata.isArray()) {
+      writeArrayColumn(columnMetadata, writer.list(columnMetadata.name()), allTextMode);
+    } else {
+      writeColumn(columnMetadata, writer, allTextMode);
+    }
+  }
+
+  /**
+   * Creates writers for all columns taken from {@code TupleMetadata schema}.
+   *
+   * @param schema      table or map schema
+   * @param fieldWriter parent writer for writers to create
+   * @param allTextMode whether all primitive writers should be of varchar type
+   */
+  private static void writeSchemaColumns(TupleMetadata schema, BaseWriter.MapWriter fieldWriter,
+      boolean allTextMode) {
+    for (ColumnMetadata columnMetadata : schema) {
+      writeSingleOrArrayColumn(columnMetadata, fieldWriter, allTextMode);
+    }
+  }
+
+  /**
+   * Creates writers for specified {@code ColumnMetadata columnMetadata} considering its children.
+   * For the case of {@code TUPLE} or {@code MULTI_ARRAY}, all child writers will be created recursively.
+   *
+   * @param columnMetadata column metadata
+   * @param fieldWriter    parent writer for writers to create
+   * @param allTextMode    whether all primitive writers should be of varchar type
+   */
+  private static void writeColumn(ColumnMetadata columnMetadata,
+      BaseWriter.MapWriter fieldWriter, boolean allTextMode) {
+    switch (columnMetadata.structureType()) {
+      case TUPLE:
+        writeSchemaColumns(
+            columnMetadata.mapSchema(), fieldWriter.map(columnMetadata.name()), allTextMode);
+        break;
+      case MULTI_ARRAY:
+        writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(columnMetadata.name()), allTextMode);
+        break;
+      case PRIMITIVE:
+        if (allTextMode) {
+          fieldWriter.varChar(columnMetadata.name());
+        } else {
+          ComplexCopier.getMapWriterForType(
+              columnMetadata.majorType(), fieldWriter, columnMetadata.name());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported type [%s] for column [%s].", columnMetadata.majorType(), columnMetadata.name()));
+    }
+  }
+
+  /**
+   * Writes column which corresponds to specified {@code ColumnMetadata columnMetadata}
+   * into specified {@code BaseWriter.ListWriter fieldWriter}.
+   *
+   * @param columnMetadata column metadata
+   * @param fieldWriter    parent writer for writers to create
+   * @param allTextMode    whether all primitive writers should be of varchar type
+   */
+  private static void writeArrayColumn(ColumnMetadata columnMetadata,
+      BaseWriter.ListWriter fieldWriter, boolean allTextMode) {
+    switch (columnMetadata.structureType()) {
+      case TUPLE:
+        writeSchemaColumns(columnMetadata.mapSchema(), fieldWriter.map(), allTextMode);
+        break;
+      case MULTI_ARRAY:
+        writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(), allTextMode);
+        break;
+      case PRIMITIVE:
+        if (allTextMode) {
+          fieldWriter.varChar();
+        } else {
+          ComplexCopier.getListWriterForType(columnMetadata.majorType(), fieldWriter);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported type [%s] for column [%s].", columnMetadata.majorType(), columnMetadata.name()));
+    }
+  }
 }
diff --git a/exec/vector/src/main/codegen/templates/ComplexCopier.java b/exec/vector/src/main/codegen/templates/ComplexCopier.java
index c61a268..1a669d8 100644
--- a/exec/vector/src/main/codegen/templates/ComplexCopier.java
+++ b/exec/vector/src/main/codegen/templates/ComplexCopier.java
@@ -128,12 +128,12 @@ public class ComplexCopier {
     case LIST:
       return (FieldWriter) writer.list(name);
     default:
-      throw new UnsupportedOperationException(type.toString());
+      throw new UnsupportedOperationException(String.format("[%s] type is not supported.", type.toString()));
     }
   }
 
-  private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter writer) {
-    switch (reader.getType().getMinorType()) {
+  public static FieldWriter getListWriterForType(TypeProtos.MajorType type, ListWriter writer) {
+    switch (type.getMinorType()) {
     <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
     <#assign fields = minor.fields!type.fields />
     <#assign uncappedName = name?uncap_first/>
@@ -142,7 +142,7 @@ public class ComplexCopier {
       return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}</#if>();
     <#elseif minor.class?contains("VarDecimal")>
     case ${name?upper_case}:
-      return (FieldWriter) writer.${uncappedName}(reader.getType().getPrecision(), reader.getType().getScale());
+      return (FieldWriter) writer.${uncappedName}(type.getPrecision(), type.getScale());
     </#if>
     </#list></#list>
     case MAP:
@@ -150,11 +150,15 @@ public class ComplexCopier {
     case LIST:
       return (FieldWriter) writer.list();
     default:
-      throw new UnsupportedOperationException(reader.getType().toString());
+      throw new UnsupportedOperationException(String.format("[%s] type is not supported.", type.toString()));
     }
   }
 
   private static FieldWriter getMapWriterForReader(FieldReader reader, BaseWriter.MapWriter writer, String name) {
     return getMapWriterForType(reader.getType(), writer, name);
   }
+
+  private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter writer) {
+    return getListWriterForType(reader.getType(), writer);
+  }
 }