You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/07/26 01:55:59 UTC

[drill] branch master updated: DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c10116bf1 DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)
9c10116bf1 is described below

commit 9c10116bf1a06023ad9053ad774564d0f2264c0b
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Jul 26 04:55:54 2022 +0300

    DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)
---
 .../codegen/templates/AbstractRecordWriter.java    |   7 +
 .../codegen/templates/EventBasedRecordWriter.java  |   6 +-
 .../templates/ParquetOutputRecordWriter.java       | 368 +++++++++------------
 .../src/main/codegen/templates/RecordWriter.java   |   6 +
 .../exec/store/parquet/ParquetRecordWriter.java    |  32 +-
 .../physical/impl/writer/TestParquetWriter.java    |  19 ++
 6 files changed, 220 insertions(+), 218 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6982c7586e..d26d35aa81 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -24,6 +24,8 @@ import java.lang.UnsupportedOperationException;
 package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.BitVector.Accessor;
@@ -96,4 +98,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   public void postProcessing() throws IOException {
     // no op
   }
+
+  @Override
+  public boolean supportsField(MaterializedField field) {
+    return !field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD);
+  }
 }
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index a541c63fd6..0d2672e861 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.drill.exec.planner.physical.WriterPrel;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" />
@@ -27,7 +26,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.complex.impl.UnionReader;
@@ -81,7 +79,7 @@ public class EventBasedRecordWriter {
     try {
       int fieldId = 0;
       for (VectorWrapper w : batch) {
-        if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+        if (!recordWriter.supportsField(w.getField())) {
           continue;
         }
         FieldReader reader = w.getValueVector().getReader();
@@ -178,4 +176,4 @@ public class EventBasedRecordWriter {
     }
     throw new UnsupportedOperationException();
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 1f6e467873..3658ecc82f 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -94,15 +94,157 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
 
   protected abstract PrimitiveType getPrimitiveType(MaterializedField field);
 
+  public abstract class BaseFieldConverter extends FieldConverter {
+
+    public BaseFieldConverter(int fieldId, String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    public abstract void read();
+
+    public abstract void read(int i);
+
+    public abstract void consume();
+
+    @Override
+    public void writeField() throws IOException {
+      read();
+      consume();
+    }
+  }
+
+  public class NullableFieldConverter extends FieldConverter {
+    private BaseFieldConverter delegate;
+
+    public NullableFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+      super(fieldId, fieldName, reader);
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      if (!reader.isSet()) {
+        return;
+      }
+      consumer.startField(fieldName, fieldId);
+      delegate.writeField();
+      consumer.endField(fieldName, fieldId);
+    }
+
+    public void setPosition(int index) {
+      delegate.setPosition(index);
+    }
+
+    public void startField() throws IOException {
+      delegate.startField();
+    }
+
+    public void endField() throws IOException {
+      delegate.endField();
+    }
+  }
+
+  public class RequiredFieldConverter extends FieldConverter {
+    private BaseFieldConverter delegate;
+
+    public RequiredFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+      super(fieldId, fieldName, reader);
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      consumer.startField(fieldName, fieldId);
+      delegate.writeField();
+      consumer.endField(fieldName, fieldId);
+    }
+
+    public void setPosition(int index) {
+      delegate.setPosition(index);
+    }
+
+    public void startField() throws IOException {
+      delegate.startField();
+    }
+
+    public void endField() throws IOException {
+      delegate.endField();
+    }
+  }
+
+  public class RepeatedFieldConverter extends FieldConverter {
+
+    private BaseFieldConverter delegate;
+
+    public RepeatedFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+      super(fieldId, fieldName, reader);
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
+      if (reader.size() == 0) {
+        return;
+      }
+      consumer.startField(fieldName, fieldId);
+      for (int i = 0; i < reader.size(); i++) {
+        delegate.read(i);
+        delegate.consume();
+      }
+      consumer.endField(fieldName, fieldId);
+    }
+
+    @Override
+    public void writeListField() {
+      if (reader.size() == 0) {
+        return;
+      }
+      consumer.startField(LIST, ZERO_IDX);
+      for (int i = 0; i < reader.size(); i++) {
+        consumer.startGroup();
+        consumer.startField(ELEMENT, ZERO_IDX);
+
+        delegate.read(i);
+        delegate.consume();
+
+        consumer.endField(ELEMENT, ZERO_IDX);
+        consumer.endGroup();
+      }
+      consumer.endField(LIST, ZERO_IDX);
+    }
+
+    public void setPosition(int index) {
+      delegate.setPosition(index);
+    }
+
+    public void startField() throws IOException {
+      delegate.startField();
+    }
+
+    public void endField() throws IOException {
+      delegate.endField();
+    }
+  }
+
 <#list vv.types as type>
   <#list type.minor as minor>
     <#list vv.modes as mode>
   @Override
   public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
-    return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, fieldName, reader);
+    BaseFieldConverter converter = new ${minor.class}ParquetConverter(fieldId, fieldName, reader);
+  <#if mode.prefix == "Nullable">
+    return new NullableFieldConverter(fieldId, fieldName, reader, converter);
+  <#elseif mode.prefix == "Repeated">
+    return new RepeatedFieldConverter(fieldId, fieldName, reader, converter);
+  <#else>
+    return new RequiredFieldConverter(fieldId, fieldName, reader, converter);
+  </#if>
   }
 
-  public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter {
+    </#list>
+
+  public class ${minor.class}ParquetConverter extends BaseFieldConverter {
     private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
     <#if minor.class?contains("Interval")>
     private final byte[] output = new byte[12];
@@ -110,7 +252,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
     private final DecimalValueWriter decimalValueWriter;
     </#if>
 
-    public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+    public ${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
       super(fieldId, fieldName, reader);
       <#if minor.class == "VarDecimal">
       decimalValueWriter = DecimalValueWriter.
@@ -119,20 +261,17 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
     }
 
     @Override
-    public void writeField() throws IOException {
-  <#if mode.prefix == "Nullable" >
-      if (!reader.isSet()) {
-        return;
-      }
-  <#elseif mode.prefix == "Repeated" >
-    // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
-    if (reader.size() == 0) {
-      return;
+    public void read() {
+      reader.read(holder);
+    }
+
+    @Override
+    public void read(int i) {
+      reader.read(i, holder);
     }
-    consumer.startField(fieldName, fieldId);
-    for (int i = 0; i < reader.size(); i++) {
-  </#if>
 
+    @Override
+    public void consume() {
   <#if  minor.class == "TinyInt" ||
         minor.class == "UInt1" ||
         minor.class == "UInt2" ||
@@ -141,80 +280,28 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
         minor.class == "Time" ||
         minor.class == "Decimal9" ||
         minor.class == "UInt4">
-    <#if mode.prefix == "Repeated" >
-            reader.read(i, holder);
-            consumer.addInteger(holder.value);
-    <#else>
-    consumer.startField(fieldName, fieldId);
-    reader.read(holder);
-    consumer.addInteger(holder.value);
-    consumer.endField(fieldName, fieldId);
-    </#if>
+      consumer.addInteger(holder.value);
   <#elseif
         minor.class == "Float4">
-      <#if mode.prefix == "Repeated" >
-              reader.read(i, holder);
-              consumer.addFloat(holder.value);
-      <#else>
-    consumer.startField(fieldName, fieldId);
-    reader.read(holder);
-    consumer.addFloat(holder.value);
-    consumer.endField(fieldName, fieldId);
-      </#if>
+      consumer.addFloat(holder.value);
   <#elseif
         minor.class == "BigInt" ||
         minor.class == "Decimal18" ||
         minor.class == "TimeStamp" ||
         minor.class == "UInt8">
-      <#if mode.prefix == "Repeated" >
-              reader.read(i, holder);
-              consumer.addLong(holder.value);
-      <#else>
-    consumer.startField(fieldName, fieldId);
-    reader.read(holder);
-    consumer.addLong(holder.value);
-    consumer.endField(fieldName, fieldId);
-      </#if>
+      consumer.addLong(holder.value);
   <#elseif minor.class == "Date">
-    <#if mode.prefix == "Repeated" >
-      reader.read(i, holder);
-      consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
-    <#else>
-      consumer.startField(fieldName, fieldId);
-      reader.read(holder);
       // convert from internal Drill date format to Julian Day centered around Unix Epoc
       consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
-      consumer.endField(fieldName, fieldId);
-    </#if>
   <#elseif
         minor.class == "Float8">
-      <#if mode.prefix == "Repeated" >
-              reader.read(i, holder);
-              consumer.addDouble(holder.value);
-      <#else>
-    consumer.startField(fieldName, fieldId);
-    reader.read(holder);
-    consumer.addDouble(holder.value);
-    consumer.endField(fieldName, fieldId);
-      </#if>
+      consumer.addDouble(holder.value);
   <#elseif
         minor.class == "Bit">
-      <#if mode.prefix == "Repeated" >
-              reader.read(i, holder);
-              consumer.addBoolean(holder.value == 1);
-      <#else>
-    consumer.startField(fieldName, fieldId);
-    reader.read(holder);
-    consumer.addBoolean(holder.value == 1);
-    consumer.endField(fieldName, fieldId);
-      </#if>
+      consumer.addBoolean(holder.value == 1);
   <#elseif
         minor.class == "Decimal28Sparse" ||
         minor.class == "Decimal38Sparse">
-      <#if mode.prefix == "Repeated" >
-      <#else>
-      consumer.startField(fieldName, fieldId);
-      reader.read(holder);
       byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
               holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
       byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
@@ -225,11 +312,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
       }
       System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
       consumer.addBinary(Binary.fromByteArray(output));
-      consumer.endField(fieldName, fieldId);
-      </#if>
   <#elseif minor.class?contains("Interval")>
-      consumer.startField(fieldName, fieldId);
-      reader.read(holder);
       <#if minor.class == "IntervalDay">
         Arrays.fill(output, 0, 4, (byte) 0);
         IntervalUtility.intToLEByteArray(holder.days, output, 4);
@@ -244,143 +327,16 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
         IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
       </#if>
       consumer.addBinary(Binary.fromByteArray(output));
-      consumer.endField(fieldName, fieldId);
-
-  <#elseif
-        minor.class == "TimeTZ" ||
-        minor.class == "Decimal28Dense" ||
-        minor.class == "Decimal38Dense">
-      <#if mode.prefix == "Repeated" >
-      <#else>
-
-      </#if>
-  <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
-        || minor.class == "VarBinary" || minor.class == "VarDecimal">
-    <#if mode.prefix == "Repeated">
-      reader.read(i, holder);
-      <#if minor.class == "VarDecimal">
-      decimalValueWriter.writeValue(consumer, holder.buffer,
-          holder.start, holder.end, reader.getField().getPrecision());
-      <#else>
-      consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
-      </#if>
-    <#else>
-      reader.read(holder);
-      consumer.startField(fieldName, fieldId);
-      <#if minor.class == "VarDecimal">
+  <#elseif minor.class == "VarDecimal">
       decimalValueWriter.writeValue(consumer, holder.buffer,
           holder.start, holder.end, reader.getField().getPrecision());
-      <#else>
+  <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
+        || minor.class == "VarBinary">
       consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
-      </#if>
-      consumer.endField(fieldName, fieldId);
-    </#if>
   </#if>
-  <#if mode.prefix == "Repeated">
     }
-    consumer.endField(fieldName, fieldId);
-  </#if>
-    }
-
-    <#if mode.prefix == "Repeated">
-     @Override
-     public void writeListField() {
-      if (reader.size() == 0) {
-        return;
-      }
-      consumer.startField(LIST, ZERO_IDX);
-      for (int i = 0; i < reader.size(); i++) {
-        consumer.startGroup();
-        consumer.startField(ELEMENT, ZERO_IDX);
-
-  <#if minor.class == "TinyInt" ||
-       minor.class == "UInt1" ||
-       minor.class == "UInt2" ||
-       minor.class == "SmallInt" ||
-       minor.class == "Int" ||
-       minor.class == "Time" ||
-       minor.class == "Decimal9" ||
-       minor.class == "UInt4">
-        reader.read(i, holder);
-        consumer.addInteger(holder.value);
-  <#elseif minor.class == "Float4">
-        reader.read(i, holder);
-        consumer.addFloat(holder.value);
-  <#elseif minor.class == "BigInt" ||
-            minor.class == "Decimal18" ||
-            minor.class == "TimeStamp" ||
-            minor.class == "UInt8">
-        reader.read(i, holder);
-        consumer.addLong(holder.value);
-  <#elseif minor.class == "Date">
-        reader.read(i, holder);
-        consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
-  <#elseif minor.class == "Float8">
-        reader.read(i, holder);
-        consumer.addDouble(holder.value);
-  <#elseif minor.class == "Bit">
-        reader.read(i, holder);
-        consumer.addBoolean(holder.value == 1);
-  <#elseif minor.class == "Decimal28Sparse" ||
-            minor.class == "Decimal38Sparse">
-      <#if mode.prefix == "Repeated" >
-      <#else>
-        consumer.startField(fieldName, fieldId);
-        reader.read(holder);
-        byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
-            holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
-        byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
-        if (holder.getSign(holder.start, holder.buffer)) {
-          Arrays.fill(output, 0, output.length - bytes.length, (byte) -1);
-        } else {
-          Arrays.fill(output, 0, output.length - bytes.length, (byte) 0);
-        }
-        System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
-        consumer.addBinary(Binary.fromByteArray(output));
-        consumer.endField(fieldName, fieldId);
-      </#if>
-  <#elseif minor.class?contains("Interval")>
-        consumer.startField(fieldName, fieldId);
-        reader.read(holder);
-      <#if minor.class == "IntervalDay">
-        Arrays.fill(output, 0, 4, (byte) 0);
-        IntervalUtility.intToLEByteArray(holder.days, output, 4);
-        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
-      <#elseif minor.class == "IntervalYear">
-        IntervalUtility.intToLEByteArray(holder.value, output, 0);
-        Arrays.fill(output, 4, 8, (byte) 0);
-        Arrays.fill(output, 8, 12, (byte) 0);
-      <#elseif minor.class == "Interval">
-        IntervalUtility.intToLEByteArray(holder.months, output, 0);
-        IntervalUtility.intToLEByteArray(holder.days, output, 4);
-        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
-      </#if>
-        consumer.addBinary(Binary.fromByteArray(output));
-        consumer.endField(fieldName, fieldId);
-
-  <#elseif
-        minor.class == "TimeTZ" ||
-            minor.class == "Decimal28Dense" ||
-            minor.class == "Decimal38Dense">
-  <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
-            || minor.class == "VarBinary" || minor.class == "VarDecimal">
-        reader.read(i, holder);
-      <#if minor.class == "VarDecimal">
-        decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end,
-            reader.getField().getPrecision());
-      <#else>
-        consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
-      </#if>
-  </#if>
 
-        consumer.endField(ELEMENT, ZERO_IDX);
-        consumer.endGroup();
-      }
-      consumer.endField(LIST, ZERO_IDX);
-     }
-    </#if>
   }
-    </#list>
   </#list>
 </#list>
 
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index d11c2a14ab..444772b93a 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,7 @@ package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -94,4 +95,9 @@ public interface RecordWriter {
   void postProcessing() throws IOException;
   void abort() throws IOException;
   void cleanup() throws IOException;
+
+  /**
+   * Checks whether this writer supports writing of the given field.
+   */
+  boolean supportsField(MaterializedField field);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index a4b68da561..bcea784051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -249,10 +250,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
   }
 
-  private void newSchema() throws IOException {
+  private void newSchema() {
     List<Type> types = new ArrayList<>();
     for (MaterializedField field : batchSchema) {
-      if (field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+      if (!supportsField(field)) {
         continue;
       }
       types.add(getType(field));
@@ -297,6 +298,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     setUp(schema, consumer);
   }
 
+  @Override
+  public boolean supportsField(MaterializedField field) {
+    return super.supportsField(field)
+      && (field.getType().getMinorType() != MinorType.MAP || field.getChildCount() > 0);
+  }
+
+  @Override
   protected PrimitiveType getPrimitiveType(MaterializedField field) {
     MinorType minorType = field.getType().getMinorType();
     String name = field.getName();
@@ -513,13 +521,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     @Override
     public void writeField() throws IOException {
-      consumer.startField(fieldName, fieldId);
-      consumer.startGroup();
-      for (FieldConverter converter : converters) {
-        converter.writeField();
+      if (!converters.isEmpty()) {
+        consumer.startField(fieldName, fieldId);
+        consumer.startGroup();
+        for (FieldConverter converter : converters) {
+          converter.writeField();
+        }
+        consumer.endGroup();
+        consumer.endField(fieldName, fieldId);
       }
-      consumer.endGroup();
-      consumer.endField(fieldName, fieldId);
     }
   }
 
@@ -683,11 +693,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void startRecord() throws IOException {
+    if (CollectionUtils.isEmpty(schema.getFields())) {
+      return;
+    }
     consumer.startMessage();
   }
 
   @Override
   public void endRecord() throws IOException {
+    if (CollectionUtils.isEmpty(schema.getFields())) {
+      return;
+    }
     consumer.endMessage();
 
     // we wait until there is at least one record before creating the parquet file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 28b009e869..707a552819 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.writer;
 
 import org.apache.calcite.util.Pair;
+import org.apache.commons.io.FileUtils;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.categories.UnlikelyTest;
@@ -57,6 +58,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
 import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.util.ArrayList;
@@ -1513,6 +1515,23 @@ public class TestParquetWriter extends ClusterTest {
     }
   }
 
+  @Test
+  public void testResultWithEmptyMap() throws Exception {
+    String fileName = "emptyMap.json";
+
+    FileUtils.writeStringToFile(new File(dirTestWatcher.getRootDir(), fileName),
+      "{\"sample\": {}, \"a\": \"a\"}", Charset.defaultCharset());
+
+    run("create table dfs.tmp.t1 as SELECT * from dfs.`%s` t", fileName);
+
+    testBuilder()
+      .sqlQuery("select * from dfs.tmp.t1")
+      .unOrdered()
+      .baselineColumns("a")
+      .baselineValues("a")
+      .go();
+  }
+
   /**
    * Checks that specified parquet table contains specified columns with specified types.
    *