You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/19 10:50:42 UTC

[2/3] drill git commit: DRILL-5544: Out of heap running CTAS against text delimited

DRILL-5544: Out of heap running CTAS against text delimited

Since parquet version of PageWriter cannot allow to use direct memory for allocating ByteBuffers.
This PR introduces other version of PageWriter and PageWriteStore.
See more: https://issues.apache.org/jira/browse/PARQUET-1006.

closes #846


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b714b2d7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b714b2d7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b714b2d7

Branch: refs/heads/master
Commit: b714b2d74f80755d02de87b3151d94cb9cfc6794
Parents: 01bcb78
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Thu May 25 17:10:55 2017 +0000
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:19:55 2017 +0300

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetRecordWriter.java |  61 ++--
 .../ColumnChunkPageWriteStoreExposer.java       |  47 ----
 .../ParquetColumnChunkPageWriteStore.java       | 280 +++++++++++++++++++
 .../physical/impl/writer/TestParquetWriter.java |   4 +-
 4 files changed, 318 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index bc495a3..1d4d161 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -50,12 +50,12 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
-import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ColumnChunkPageWriteStoreExposer;
+import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.ColumnIOFactory;
@@ -100,7 +100,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
 
   private ColumnWriteStore store;
-  private PageWriteStore pageStore;
+  private ParquetColumnChunkPageWriteStore pageStore;
 
   private RecordConsumer consumer;
   private BatchSchema batchSchema;
@@ -206,11 +206,21 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
     schema = new MessageType("root", types);
 
+    // We don't want this number to be too small, ideally we divide the block equally across the columns.
+    // It is unlikely all columns are going to be the same size.
+    // Its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type.
+    // Therefore this size is cast to int, since allocating byte array in under layer needs to
+    // limit the array size in an int scope.
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
-    pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
-        codecFactory.getCompressor(codec),
-        schema);
+    // We don't want this number to be too small either. Ideally, slightly bigger than the page size,
+    // but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
+    // TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library
+    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
+    // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
+    // once PARQUET-1006 will be resolved
+    pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
+        pageSize, new ParquetDirectByteBufferAllocator(oContext));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
         writerVersion, new ParquetDirectByteBufferAllocator(oContext));
     MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
@@ -263,26 +273,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   }
 
   private void flush() throws IOException {
-    if (recordCount > 0) {
-      parquetFileWriter.startBlock(recordCount);
-      consumer.flush();
-      store.flush();
-      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-
-      // we are writing one single block per file
-      parquetFileWriter.end(extraMetaData);
-      parquetFileWriter = null;
-    }
-
-    store.close();
-    // TODO(jaltekruse) - review this close method should no longer be necessary
-//    ColumnChunkPageWriteStoreExposer.close(pageStore);
+    try {
+      if (recordCount > 0) {
+        parquetFileWriter.startBlock(recordCount);
+        consumer.flush();
+        store.flush();
+        pageStore.flushToFileWriter(parquetFileWriter);
+        recordCount = 0;
+        parquetFileWriter.endBlock();
+
+        // we are writing one single block per file
+        parquetFileWriter.end(extraMetaData);
+        parquetFileWriter = null;
+      }
+    } finally {
+      store.close();
+      pageStore.close();
 
-    store = null;
-    pageStore = null;
-    index++;
+      store = null;
+      pageStore = null;
+      index++;
+    }
   }
 
   private void checkBlockSizeReached() throws IOException {

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
deleted file mode 100644
index 564a0a4..0000000
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.parquet.hadoop;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-
-import org.apache.parquet.column.page.PageWriteStore;
-import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
-import org.apache.parquet.schema.MessageType;
-
-public class ColumnChunkPageWriteStoreExposer {
-
-  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
-      OperatorContext oContext,
-      BytesCompressor compressor,
-      MessageType schema
-      ) {
-    return new ColumnChunkPageWriteStore(compressor, schema, new ParquetDirectByteBufferAllocator(oContext));
-  }
-
-  public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException {
-    ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
-  }
-
-  // TODO(jaltekruse) - review, this used to have a method for closing a pageStore
-  // the parquet code once rebased did not include this close method, make sure it isn't needed
-  // I might have messed up the merge
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..2d8e27d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
+ * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
+ * It will be no need in this class once PARQUET-1006 is resolved.
+ */
+public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = Maps.newHashMap();
+  private final MessageType schema;
+
+  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
+                                          MessageType schema,
+                                          int initialSlabSize,
+                                          int maxCapacityHint,
+                                          ByteBufferAllocator allocator) {
+    this.schema = schema;
+    for (ColumnDescriptor path : schema.getColumns()) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator));
+    }
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
+  /**
+   * Writes the column chunks in the corresponding row group
+   * @param writer the parquet file writer
+   * @throws IOException if the file can not be created
+   */
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    for (ColumnDescriptor path : schema.getColumns()) {
+      ColumnChunkPageWriter pageWriter = writers.get(path);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+  @Override
+  public void close() {
+    for (ColumnChunkPageWriter pageWriter : writers.values()) {
+      pageWriter.close();
+    }
+  }
+
+  private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
+
+    private final ColumnDescriptor path;
+    private final BytesCompressor compressor;
+
+    private final CapacityByteArrayOutputStream buf;
+    private DictionaryPage dictionaryPage;
+
+    private long uncompressedLength;
+    private long compressedLength;
+    private long totalValueCount;
+    private int pageCount;
+
+    // repetition and definition level encodings are used only for v1 pages and don't change
+    private Set<Encoding> rlEncodings = Sets.newHashSet();
+    private Set<Encoding> dlEncodings = Sets.newHashSet();
+    private List<Encoding> dataEncodings = Lists.newArrayList();
+
+    private Statistics totalStatistics;
+
+    private ColumnChunkPageWriter(ColumnDescriptor path,
+                                  BytesCompressor compressor,
+                                  int initialSlabSize,
+                                  int maxCapacityHint,
+                                  ByteBufferAllocator allocator) {
+      this.path = path;
+      this.compressor = compressor;
+      this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
+      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+    }
+
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      // Parquet library creates bad metadata if the uncompressed or compressed size of a page exceeds Integer.MAX_VALUE
+      if (uncompressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
+                uncompressedSize);
+      }
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      if (compressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+                + compressedSize);
+      }
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+      compressedBytes.writeAllTo(buf);
+      rlEncodings.add(rlEncoding);
+      dlEncodings.add(dlEncoding);
+      dataEncodings.add(valuesEncoding);
+    }
+
+    @Override
+    public void writePageV2(int rowCount,
+                            int nullCount,
+                            int valueCount,
+                            BytesInput repetitionLevels,
+                            BytesInput definitionLevels,
+                            Encoding dataEncoding,
+                            BytesInput data,
+                            Statistics<?> statistics) throws IOException {
+      int rlByteLength = toIntWithCheck(repetitionLevels.size());
+      int dlByteLength = toIntWithCheck(definitionLevels.size());
+      int uncompressedSize = toIntWithCheck(
+          data.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      BytesInput compressedData = compressor.compress(data);
+      int compressedSize = toIntWithCheck(
+          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize, compressedSize,
+          valueCount, nullCount, rowCount,
+          statistics,
+          dataEncoding,
+          rlByteLength,
+          dlByteLength,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+
+      definitionLevels.writeAllTo(buf);
+      compressedData.writeAllTo(buf);
+
+      dataEncodings.add(dataEncoding);
+    }
+
+    private int toIntWithCheck(long size) {
+      if (size > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+                size);
+      }
+      return (int)size;
+    }
+
+    @Override
+    public long getMemSize() {
+      return buf.size();
+    }
+
+    /**
+     * Writes a number of pages within corresponding column chunk
+     * @param writer the parquet file writer
+     * @throws IOException if the file can not be created
+     */
+    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
+      writer.startColumn(path, totalValueCount, compressor.getCodecName());
+      if (dictionaryPage != null) {
+        writer.writeDictionaryPage(dictionaryPage);
+        // tracking the dictionary encoding is handled in writeDictionaryPage
+      }
+      List<Encoding> encodings = Lists.newArrayList();
+      encodings.addAll(rlEncodings);
+      encodings.addAll(dlEncodings);
+      encodings.addAll(dataEncodings);
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+      writer.endColumn();
+      logger.debug(
+          String.format(
+              "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+              buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, Sets.newHashSet(dataEncodings))
+              + (dictionaryPage != null ? String.format(
+              ", dic { %,d entries, %,dB raw, %,dB comp}",
+              dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+              : ""));
+      rlEncodings.clear();
+      dlEncodings.clear();
+      dataEncodings.clear();
+      pageCount = 0;
+    }
+
+    @Override
+    public long allocatedSize() {
+      return buf.getCapacity();
+    }
+
+    @Override
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+      if (this.dictionaryPage != null) {
+        throw new ParquetEncodingException("Only one dictionary page is allowed");
+      }
+      BytesInput dictionaryBytes = dictionaryPage.getBytes();
+      int uncompressedSize = (int)dictionaryBytes.size();
+      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
+          dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+    }
+
+    @Override
+    public void close() {
+      buf.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index e5c6ce4..3c174ae 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -760,7 +760,7 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   /*
-  Test the reading of a binary field as drill varbinary where data is in dicationary _and_ non-dictionary encoded pages
+  Test the reading of a binary field as drill varbinary where data is in dictionary _and_ non-dictionary encoded pages
    */
   @Test
   public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception {
@@ -768,7 +768,7 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   /*
-  Test the reading of a binary field as drill timestamp where data is in dicationary _and_ non-dictionary encoded pages
+  Test the reading of a binary field as drill timestamp where data is in dictionary _and_ non-dictionary encoded pages
    */
   @Test
   @Ignore("relies on particular time zone, works for UTC")