You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/07/26 01:55:59 UTC
[drill] branch master updated: DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 9c10116bf1 DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)
9c10116bf1 is described below
commit 9c10116bf1a06023ad9053ad774564d0f2264c0b
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Jul 26 04:55:54 2022 +0300
DRILL-8272: Skip MAP column without children when creating parquet tables (#2613)
---
.../codegen/templates/AbstractRecordWriter.java | 7 +
.../codegen/templates/EventBasedRecordWriter.java | 6 +-
.../templates/ParquetOutputRecordWriter.java | 368 +++++++++------------
.../src/main/codegen/templates/RecordWriter.java | 6 +
.../exec/store/parquet/ParquetRecordWriter.java | 32 +-
.../physical/impl/writer/TestParquetWriter.java | 19 ++
6 files changed, 220 insertions(+), 218 deletions(-)
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6982c7586e..d26d35aa81 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -24,6 +24,8 @@ import java.lang.UnsupportedOperationException;
package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.BitVector.Accessor;
@@ -96,4 +98,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
public void postProcessing() throws IOException {
// no op
}
+
+ @Override
+ public boolean supportsField(MaterializedField field) {
+ return !field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD);
+ }
}
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index a541c63fd6..0d2672e861 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import org.apache.drill.exec.planner.physical.WriterPrel;
<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" />
@@ -27,7 +26,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.complex.impl.UnionReader;
@@ -81,7 +79,7 @@ public class EventBasedRecordWriter {
try {
int fieldId = 0;
for (VectorWrapper w : batch) {
- if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+ if (!recordWriter.supportsField(w.getField())) {
continue;
}
FieldReader reader = w.getValueVector().getReader();
@@ -178,4 +176,4 @@ public class EventBasedRecordWriter {
}
throw new UnsupportedOperationException();
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 1f6e467873..3658ecc82f 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -94,15 +94,157 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
protected abstract PrimitiveType getPrimitiveType(MaterializedField field);
+ public abstract class BaseFieldConverter extends FieldConverter {
+
+ public BaseFieldConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ public abstract void read();
+
+ public abstract void read(int i);
+
+ public abstract void consume();
+
+ @Override
+ public void writeField() throws IOException {
+ read();
+ consume();
+ }
+ }
+
+ public class NullableFieldConverter extends FieldConverter {
+ private BaseFieldConverter delegate;
+
+ public NullableFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+ super(fieldId, fieldName, reader);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ if (!reader.isSet()) {
+ return;
+ }
+ consumer.startField(fieldName, fieldId);
+ delegate.writeField();
+ consumer.endField(fieldName, fieldId);
+ }
+
+ public void setPosition(int index) {
+ delegate.setPosition(index);
+ }
+
+ public void startField() throws IOException {
+ delegate.startField();
+ }
+
+ public void endField() throws IOException {
+ delegate.endField();
+ }
+ }
+
+ public class RequiredFieldConverter extends FieldConverter {
+ private BaseFieldConverter delegate;
+
+ public RequiredFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+ super(fieldId, fieldName, reader);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ consumer.startField(fieldName, fieldId);
+ delegate.writeField();
+ consumer.endField(fieldName, fieldId);
+ }
+
+ public void setPosition(int index) {
+ delegate.setPosition(index);
+ }
+
+ public void startField() throws IOException {
+ delegate.startField();
+ }
+
+ public void endField() throws IOException {
+ delegate.endField();
+ }
+ }
+
+ public class RepeatedFieldConverter extends FieldConverter {
+
+ private BaseFieldConverter delegate;
+
+ public RepeatedFieldConverter(int fieldId, String fieldName, FieldReader reader, BaseFieldConverter delegate) {
+ super(fieldId, fieldName, reader);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
+ if (reader.size() == 0) {
+ return;
+ }
+ consumer.startField(fieldName, fieldId);
+ for (int i = 0; i < reader.size(); i++) {
+ delegate.read(i);
+ delegate.consume();
+ }
+ consumer.endField(fieldName, fieldId);
+ }
+
+ @Override
+ public void writeListField() {
+ if (reader.size() == 0) {
+ return;
+ }
+ consumer.startField(LIST, ZERO_IDX);
+ for (int i = 0; i < reader.size(); i++) {
+ consumer.startGroup();
+ consumer.startField(ELEMENT, ZERO_IDX);
+
+ delegate.read(i);
+ delegate.consume();
+
+ consumer.endField(ELEMENT, ZERO_IDX);
+ consumer.endGroup();
+ }
+ consumer.endField(LIST, ZERO_IDX);
+ }
+
+ public void setPosition(int index) {
+ delegate.setPosition(index);
+ }
+
+ public void startField() throws IOException {
+ delegate.startField();
+ }
+
+ public void endField() throws IOException {
+ delegate.endField();
+ }
+ }
+
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
@Override
public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
- return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, fieldName, reader);
+ BaseFieldConverter converter = new ${minor.class}ParquetConverter(fieldId, fieldName, reader);
+ <#if mode.prefix == "Nullable">
+ return new NullableFieldConverter(fieldId, fieldName, reader, converter);
+ <#elseif mode.prefix == "Repeated">
+ return new RepeatedFieldConverter(fieldId, fieldName, reader, converter);
+ <#else>
+ return new RequiredFieldConverter(fieldId, fieldName, reader, converter);
+ </#if>
}
- public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter {
+ </#list>
+
+ public class ${minor.class}ParquetConverter extends BaseFieldConverter {
private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
<#if minor.class?contains("Interval")>
private final byte[] output = new byte[12];
@@ -110,7 +252,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
private final DecimalValueWriter decimalValueWriter;
</#if>
- public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+ public ${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
<#if minor.class == "VarDecimal">
decimalValueWriter = DecimalValueWriter.
@@ -119,20 +261,17 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
}
@Override
- public void writeField() throws IOException {
- <#if mode.prefix == "Nullable" >
- if (!reader.isSet()) {
- return;
- }
- <#elseif mode.prefix == "Repeated" >
- // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
- if (reader.size() == 0) {
- return;
+ public void read() {
+ reader.read(holder);
+ }
+
+ @Override
+ public void read(int i) {
+ reader.read(i, holder);
}
- consumer.startField(fieldName, fieldId);
- for (int i = 0; i < reader.size(); i++) {
- </#if>
+ @Override
+ public void consume() {
<#if minor.class == "TinyInt" ||
minor.class == "UInt1" ||
minor.class == "UInt2" ||
@@ -141,80 +280,28 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
minor.class == "Time" ||
minor.class == "Decimal9" ||
minor.class == "UInt4">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addInteger(holder.value);
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- consumer.addInteger(holder.value);
- consumer.endField(fieldName, fieldId);
- </#if>
+ consumer.addInteger(holder.value);
<#elseif
minor.class == "Float4">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addFloat(holder.value);
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- consumer.addFloat(holder.value);
- consumer.endField(fieldName, fieldId);
- </#if>
+ consumer.addFloat(holder.value);
<#elseif
minor.class == "BigInt" ||
minor.class == "Decimal18" ||
minor.class == "TimeStamp" ||
minor.class == "UInt8">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addLong(holder.value);
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- consumer.addLong(holder.value);
- consumer.endField(fieldName, fieldId);
- </#if>
+ consumer.addLong(holder.value);
<#elseif minor.class == "Date">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
// convert from internal Drill date format to Julian Day centered around Unix Epoc
consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
- consumer.endField(fieldName, fieldId);
- </#if>
<#elseif
minor.class == "Float8">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addDouble(holder.value);
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- consumer.addDouble(holder.value);
- consumer.endField(fieldName, fieldId);
- </#if>
+ consumer.addDouble(holder.value);
<#elseif
minor.class == "Bit">
- <#if mode.prefix == "Repeated" >
- reader.read(i, holder);
- consumer.addBoolean(holder.value == 1);
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- consumer.addBoolean(holder.value == 1);
- consumer.endField(fieldName, fieldId);
- </#if>
+ consumer.addBoolean(holder.value == 1);
<#elseif
minor.class == "Decimal28Sparse" ||
minor.class == "Decimal38Sparse">
- <#if mode.prefix == "Repeated" >
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
@@ -225,11 +312,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
}
System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
consumer.addBinary(Binary.fromByteArray(output));
- consumer.endField(fieldName, fieldId);
- </#if>
<#elseif minor.class?contains("Interval")>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
<#if minor.class == "IntervalDay">
Arrays.fill(output, 0, 4, (byte) 0);
IntervalUtility.intToLEByteArray(holder.days, output, 4);
@@ -244,143 +327,16 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
</#if>
consumer.addBinary(Binary.fromByteArray(output));
- consumer.endField(fieldName, fieldId);
-
- <#elseif
- minor.class == "TimeTZ" ||
- minor.class == "Decimal28Dense" ||
- minor.class == "Decimal38Dense">
- <#if mode.prefix == "Repeated" >
- <#else>
-
- </#if>
- <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
- || minor.class == "VarBinary" || minor.class == "VarDecimal">
- <#if mode.prefix == "Repeated">
- reader.read(i, holder);
- <#if minor.class == "VarDecimal">
- decimalValueWriter.writeValue(consumer, holder.buffer,
- holder.start, holder.end, reader.getField().getPrecision());
- <#else>
- consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
- </#if>
- <#else>
- reader.read(holder);
- consumer.startField(fieldName, fieldId);
- <#if minor.class == "VarDecimal">
+ <#elseif minor.class == "VarDecimal">
decimalValueWriter.writeValue(consumer, holder.buffer,
holder.start, holder.end, reader.getField().getPrecision());
- <#else>
+ <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
+ || minor.class == "VarBinary">
consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
- </#if>
- consumer.endField(fieldName, fieldId);
- </#if>
</#if>
- <#if mode.prefix == "Repeated">
}
- consumer.endField(fieldName, fieldId);
- </#if>
- }
-
- <#if mode.prefix == "Repeated">
- @Override
- public void writeListField() {
- if (reader.size() == 0) {
- return;
- }
- consumer.startField(LIST, ZERO_IDX);
- for (int i = 0; i < reader.size(); i++) {
- consumer.startGroup();
- consumer.startField(ELEMENT, ZERO_IDX);
-
- <#if minor.class == "TinyInt" ||
- minor.class == "UInt1" ||
- minor.class == "UInt2" ||
- minor.class == "SmallInt" ||
- minor.class == "Int" ||
- minor.class == "Time" ||
- minor.class == "Decimal9" ||
- minor.class == "UInt4">
- reader.read(i, holder);
- consumer.addInteger(holder.value);
- <#elseif minor.class == "Float4">
- reader.read(i, holder);
- consumer.addFloat(holder.value);
- <#elseif minor.class == "BigInt" ||
- minor.class == "Decimal18" ||
- minor.class == "TimeStamp" ||
- minor.class == "UInt8">
- reader.read(i, holder);
- consumer.addLong(holder.value);
- <#elseif minor.class == "Date">
- reader.read(i, holder);
- consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
- <#elseif minor.class == "Float8">
- reader.read(i, holder);
- consumer.addDouble(holder.value);
- <#elseif minor.class == "Bit">
- reader.read(i, holder);
- consumer.addBoolean(holder.value == 1);
- <#elseif minor.class == "Decimal28Sparse" ||
- minor.class == "Decimal38Sparse">
- <#if mode.prefix == "Repeated" >
- <#else>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
- holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
- byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
- if (holder.getSign(holder.start, holder.buffer)) {
- Arrays.fill(output, 0, output.length - bytes.length, (byte) -1);
- } else {
- Arrays.fill(output, 0, output.length - bytes.length, (byte) 0);
- }
- System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
- consumer.addBinary(Binary.fromByteArray(output));
- consumer.endField(fieldName, fieldId);
- </#if>
- <#elseif minor.class?contains("Interval")>
- consumer.startField(fieldName, fieldId);
- reader.read(holder);
- <#if minor.class == "IntervalDay">
- Arrays.fill(output, 0, 4, (byte) 0);
- IntervalUtility.intToLEByteArray(holder.days, output, 4);
- IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
- <#elseif minor.class == "IntervalYear">
- IntervalUtility.intToLEByteArray(holder.value, output, 0);
- Arrays.fill(output, 4, 8, (byte) 0);
- Arrays.fill(output, 8, 12, (byte) 0);
- <#elseif minor.class == "Interval">
- IntervalUtility.intToLEByteArray(holder.months, output, 0);
- IntervalUtility.intToLEByteArray(holder.days, output, 4);
- IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
- </#if>
- consumer.addBinary(Binary.fromByteArray(output));
- consumer.endField(fieldName, fieldId);
-
- <#elseif
- minor.class == "TimeTZ" ||
- minor.class == "Decimal28Dense" ||
- minor.class == "Decimal38Dense">
- <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
- || minor.class == "VarBinary" || minor.class == "VarDecimal">
- reader.read(i, holder);
- <#if minor.class == "VarDecimal">
- decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end,
- reader.getField().getPrecision());
- <#else>
- consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
- </#if>
- </#if>
- consumer.endField(ELEMENT, ZERO_IDX);
- consumer.endGroup();
- }
- consumer.endField(LIST, ZERO_IDX);
- }
- </#if>
}
- </#list>
</#list>
</#list>
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index d11c2a14ab..444772b93a 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,7 @@ package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -94,4 +95,9 @@ public interface RecordWriter {
void postProcessing() throws IOException;
void abort() throws IOException;
void cleanup() throws IOException;
+
+ /**
+ * Checks whether this writer supports writing of the given field.
+ */
+ boolean supportsField(MaterializedField field);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index a4b68da561..bcea784051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
@@ -249,10 +250,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
}
- private void newSchema() throws IOException {
+ private void newSchema() {
List<Type> types = new ArrayList<>();
for (MaterializedField field : batchSchema) {
- if (field.getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+ if (!supportsField(field)) {
continue;
}
types.add(getType(field));
@@ -297,6 +298,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
setUp(schema, consumer);
}
+ @Override
+ public boolean supportsField(MaterializedField field) {
+ return super.supportsField(field)
+ && (field.getType().getMinorType() != MinorType.MAP || field.getChildCount() > 0);
+ }
+
+ @Override
protected PrimitiveType getPrimitiveType(MaterializedField field) {
MinorType minorType = field.getType().getMinorType();
String name = field.getName();
@@ -513,13 +521,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void writeField() throws IOException {
- consumer.startField(fieldName, fieldId);
- consumer.startGroup();
- for (FieldConverter converter : converters) {
- converter.writeField();
+ if (!converters.isEmpty()) {
+ consumer.startField(fieldName, fieldId);
+ consumer.startGroup();
+ for (FieldConverter converter : converters) {
+ converter.writeField();
+ }
+ consumer.endGroup();
+ consumer.endField(fieldName, fieldId);
}
- consumer.endGroup();
- consumer.endField(fieldName, fieldId);
}
}
@@ -683,11 +693,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void startRecord() throws IOException {
+ if (CollectionUtils.isEmpty(schema.getFields())) {
+ return;
+ }
consumer.startMessage();
}
@Override
public void endRecord() throws IOException {
+ if (CollectionUtils.isEmpty(schema.getFields())) {
+ return;
+ }
consumer.endMessage();
// we wait until there is at least one record before creating the parquet file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 28b009e869..707a552819 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.writer;
import org.apache.calcite.util.Pair;
+import org.apache.commons.io.FileUtils;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.categories.UnlikelyTest;
@@ -57,6 +58,7 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
@@ -1513,6 +1515,23 @@ public class TestParquetWriter extends ClusterTest {
}
}
+ @Test
+ public void testResultWithEmptyMap() throws Exception {
+ String fileName = "emptyMap.json";
+
+ FileUtils.writeStringToFile(new File(dirTestWatcher.getRootDir(), fileName),
+ "{\"sample\": {}, \"a\": \"a\"}", Charset.defaultCharset());
+
+ run("create table dfs.tmp.t1 as SELECT * from dfs.`%s` t", fileName);
+
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.t1")
+ .unOrdered()
+ .baselineColumns("a")
+ .baselineValues("a")
+ .go();
+ }
+
/**
* Checks that specified parquet table contains specified columns with specified types.
*