You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/03/13 07:26:04 UTC

[parquet-mr] branch master updated: PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (#620)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 892dedb  PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (#620)
892dedb is described below

commit 892dedb23591bb4e38a061d5ea607637fd4e210f
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Wed Mar 13 08:25:56 2019 +0100

    PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (#620)
---
 .../benchmarks/NestedNullWritingBenchmarks.java    | 151 +++++++++++++++++++++
 .../apache/parquet/column/ColumnWriteStore.java    |  12 +-
 .../parquet/column/impl/ColumnWriteStoreBase.java  |   5 +
 .../parquet/column/impl/ColumnWriterBase.java      |   3 +
 .../org/apache/parquet/io/MessageColumnIO.java     |   7 +
 .../hadoop/example/ExampleParquetWriter.java       |  16 +++
 .../apache/parquet/hadoop/TestParquetWriter.java   |  43 +++++-
 7 files changed, 235 insertions(+), 2 deletions(-)

diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
new file mode 100644
index 0000000..324775b
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
@@ -0,0 +1,151 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.openjdk.jmh.annotations.Mode.SingleShotTime;
+import static org.openjdk.jmh.annotations.Scope.Benchmark;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmark to measure writing nested null values. (See PARQUET-343 for details.)
+ * <p>
+ * To execute this benchmark a jar file shall be created of this module. Then the jar file can be executed using the JMH
+ * framework.<br>
+ * The following one-liner (shall be executed in the parquet-benchmarks submodule) generates result statistics in the
+ * file {@code jmh-result.json}. This json might be visualized by using the tool at
+ * <a href="https://jmh.morethan.io">https://jmh.morethan.io</a>.
+ *
+ * <pre>
+ * mvn clean package &amp;&amp; java -jar target/parquet-benchmarks.jar org.apache.parquet.benchmarks.NestedNullWritingBenchmarks -rf json
+ * </pre>
+ */
+@BenchmarkMode(SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 10, batchSize = 1)
+@Measurement(iterations = 50, batchSize = 1)
+@OutputTimeUnit(MILLISECONDS)
+@State(Benchmark)
+public class NestedNullWritingBenchmarks {
+  private static final MessageType SCHEMA = Types.buildMessage()
+      .optionalList()
+      .optionalElement(INT32)
+      .named("int_list")
+      .optionalList()
+      .optionalListElement()
+      .optionalElement(BINARY)
+      .named("dummy_list")
+      .optionalMap()
+      .key(BINARY)
+      .value(BINARY, OPTIONAL)
+      .named("dummy_map")
+      .optionalGroup()
+      .optional(BINARY).named("dummy_group_value1")
+      .optional(BINARY).named("dummy_group_value2")
+      .optional(BINARY).named("dummy_group_value3")
+      .named("dummy_group")
+      .named("msg");
+  private static final int RECORD_COUNT = 10_000_000;
+  private static final double NULL_RATIO = 0.99;
+  private static final OutputFile BLACK_HOLE = new OutputFile() {
+    @Override
+    public boolean supportsBlockSize() {
+      return false;
+    }
+
+    @Override
+    public long defaultBlockSize() {
+      return -1L;
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+      return create(blockSizeHint);
+    }
+
+    @Override
+    public PositionOutputStream create(long blockSizeHint) {
+      return new PositionOutputStream() {
+        private long pos;
+
+        @Override
+        public long getPos() throws IOException {
+          return pos;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+          ++pos;
+        }
+      };
+    }
+  };
+
+  private static class ValueGenerator {
+    private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA);
+    private static final Group NULL = FACTORY.newGroup();
+    private final Random random = new Random(42);
+
+    public Group nextValue() {
+      if (random.nextDouble() > NULL_RATIO) {
+        Group group = FACTORY.newGroup();
+        group.addGroup("int_list").addGroup("list").append("element", random.nextInt());
+        return group;
+      } else {
+        return NULL;
+      }
+    }
+  }
+
+  @Benchmark
+  public void benchmarkWriting() throws IOException {
+    ValueGenerator generator = new ValueGenerator();
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(BLACK_HOLE)
+        .withWriteMode(Mode.OVERWRITE)
+        .withType(SCHEMA)
+        .build()) {
+      for (int i = 0; i < RECORD_COUNT; ++i) {
+        writer.write(generator.nextValue());
+      }
+    }
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
index e14c7dc..db5320a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
@@ -52,7 +52,7 @@ public interface ColumnWriteStore {
   abstract public long getBufferedSize();
 
   /**
-   * used for debugging pupose
+   * used for debugging purpose
    * @return a formated string representing memory usage per column
    */
   abstract public String memUsageString();
@@ -62,4 +62,14 @@ public interface ColumnWriteStore {
    */
   abstract public void close();
 
+  /**
+   * Returns whether flushing the possibly cached values (or nulls) to the underlying column writers is necessary,
+   * because the pages might be closed after the next invocation of {@link #endRecord()}.
+   *
+   * @return {@code true} if all the values shall be written to the underlying column writers before calling
+   *         {@link #endRecord()}
+   */
+  default boolean isColumnFlushNeeded() {
+    return false;
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index ac9aaca..2670c31 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -229,4 +229,9 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
       rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
     }
   }
+
+  @Override
+  public boolean isColumnFlushNeeded() {
+    return rowCount + 1 >= rowCountForNextSizeCheck;
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 3788c82..8fc7d31 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -305,6 +305,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
    * Writes the current data to a new page in the page store
    */
   void writePage() {
+    if (valueCount == 0) {
+      throw new ParquetEncodingException("writing empty page");
+    }
     this.rowsWrittenSoFar += pageRowCount;
     if (DEBUG)
       LOG.debug("write page");
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index f1da363..8fc4f91 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -297,6 +297,13 @@ public class MessageColumnIO extends GroupColumnIO {
     @Override
     public void endMessage() {
       writeNullForMissingFieldsAtCurrentLevel();
+
+      // We need to flush the cached null values before ending the record to ensure that everything is sent to the
+      // writer before the current page would be closed
+      if (columns.isColumnFlushNeeded()) {
+        flush();
+      }
+
       columns.endRecord();
       if (DEBUG) log("< MESSAGE END >");
       if (DEBUG) printState();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index 88879c2..12a67d3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -25,6 +25,7 @@ import org.apache.parquet.example.data.Group;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.schema.MessageType;
 import java.io.IOException;
 import java.util.HashMap;
@@ -48,6 +49,17 @@ public class ExampleParquetWriter extends ParquetWriter<Group> {
   }
 
   /**
+   * Creates a Builder for configuring ParquetWriter with the example object
+   * model. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE.
+   *
+   * @param file the output file to create
+   * @return a {@link Builder} to create a {@link ParquetWriter}
+   */
+  public static Builder builder(OutputFile file) {
+    return new Builder(file);
+  }
+
+  /**
    * Create a new {@link ExampleParquetWriter}.
    *
    * @param file The file name to write to.
@@ -78,6 +90,10 @@ public class ExampleParquetWriter extends ParquetWriter<Group> {
       super(file);
     }
 
+    private Builder(OutputFile file) {
+      super(file);
+    }
+
     public Builder withType(MessageType type) {
       this.type = type;
       return this;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 6fc3c72..25c9608 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop;
 import static java.util.Arrays.asList;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
 import static org.apache.parquet.column.Encoding.PLAIN;
@@ -32,20 +33,24 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
 import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
 import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.example.ExampleInputFormat;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
-import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -54,6 +59,7 @@ import org.junit.Test;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
@@ -166,4 +172,39 @@ public class TestParquetWriter {
     Assert.assertFalse("Should not create a file when schema is rejected",
         file.exists());
   }
+
+  // Testing the issue of PARQUET-1531 where writing null nested rows leads to empty pages if the page row count limit
+  // is reached.
+  @Test
+  public void testNullValuesWithPageRowLimit() throws IOException {
+    MessageType schema = Types.buildMessage().optionalList().optionalElement(BINARY).as(stringType()).named("str_list")
+        .named("msg");
+    final int recordCount = 100;
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    Group listNull = factory.newGroup();
+
+    File file = temp.newFile();
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+        .withPageRowCountLimit(10)
+        .withConf(conf)
+        .build()) {
+      for (int i = 0; i < recordCount; ++i) {
+        writer.write(listNull);
+      }
+    }
+
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).build()) {
+      int readRecordCount = 0;
+      for (Group group = reader.read(); group != null; group = reader.read()) {
+        assertEquals(listNull.toString(), group.toString());
+        ++readRecordCount;
+      }
+      assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
+    }
+  }
 }