You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vdiravka <gi...@git.apache.org> on 2017/05/26 16:01:44 UTC

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

GitHub user vdiravka opened a pull request:

    https://github.com/apache/drill/pull/846

    DRILL-5544: Out of heap running CTAS against text delimited

    - Since parquet version of PageWriter cann't allow to use direct memory for allocating ByteBuffers,
      this PR introduces other version of PageWriter and PageWriteStore. 
      See more: https://issues.apache.org/jira/browse/PARQUET-1006

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vdiravka/drill DRILL-5544

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #846
    
----
commit fefdf877a467ed8e8af598029c679e17856b38c6
Author: Vitalii Diravka <vi...@gmail.com>
Date:   2017-05-25T17:10:55Z

    DRILL-5544: Out of heap running CTAS against text delimited
    - Since parquet version of PageWriter cann't allow to use direct memory for allocating ByteBuffers,
      this PR introduces other version of PageWriter and PageWriteStore. See more: https://issues.apache.org/jira/browse/PARQUET-1006

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/846


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on the issue:

    https://github.com/apache/drill/pull/846
  
    Commits are squashed into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/846
  
    Chatted with Parth who mentioned that Parquet page sizes are typically on the order of 1MB, maybe 8 MB, but 16 MB is too large.
    
    The concern expressed in earlier comments was that if we buffer, say, 256 MB of data per file, and we're doing many parallel writes, we will use up too much memory.
    
    But, if we buffer only one page at a time, and we control page size to be some amount on the order of 1-2 MB, then even with 100 threads, we're still using only 200 MB, say, which is fine.
    
    In this case, the direct memory solution is fine. (But please check performance.)
    
    However, if we are running out of memory, I wonder if we are not controlling page size and letting them get too large? Did you happen to check the size of the pages we are writing?
    
    If the pages are too big, let's file another JIRA ticket to fix that problem so that we have a complete solution.
    
    Once we confirm that we are writing small pages (or file that JIRA if not), I'll change my vote from +0 to +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119836279
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    Your first case is correct - for 10 runs * 512 (512Mb - default parquet block size in Drill) = 5Gb or for 40 runs we need 20Gb of memory. It is so much for heap memory, but ok for direct one. And we can control how much data can be buffered before writing to the disk with `store.parquet.block-size` option. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119187078
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    A broader question. Is this code trying to write all of the Parquet chunk data to a memory buffer before writing to disk? What is the maximum file size? The size of one chunk? (256 MB or 512 MB?) Or, the size of the whole multi-chunk file? Many GB?
    
    Now, suppose this runs in parallel. Suppose we run, say, 10 of these at once. Would we need 10 * 256 MB memory = 2.5 GB? Or, if the file is 4 GB, would we need 10 * 4 GB = 40GB of memory to write the files concurrently?
    
    If so, I wonder if the problem is more than just using direct memory vs. heap. The problem is trying to buffer data in memory that properly belongs on disk. Perhaps the Parquet format is not designed for efficient writes. Is that the case? What prevents efficient writes and forces buffering?
    
    Given that, should we have a spill-to-disk solution? Write blocks to a temp file, then assemble them into the final file?
    
    All this needs to be considered because, as we try to manage memory, having a write that makes unlimited use of direct memory will be a problem.
    
    Perhaps add some notes somewhere to explain how Parquet does the writes. Or, if that is already described in Parquet somewhere, please just include a link to that description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r120694784
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    I meant a special case regarding 20Gb. Usually 40 threads can be used in sufficiently powerful machines with enough RAM space. But we can control the memory usage by reducing number of threads per node to prevent OOM while buffering data.
    
    512Mb is the default `store.parquet.block-size`, but according [Drill doc](https://drill.apache.org/docs/parquet-format/#configuring-the-size-of-parquet-files) it is recommended to set this value to the number of bytes less than or equal to the block size of MFS, HDFS, or the file system. Then only 320K will be buffered in memory. 
    
    Nothing wrong with `BufferedOutputStream`. Original `ColumnChunkPageWriter` uses it, but together with `ConcatenatingByteArrayCollector` (which can buffer data only in `List<byte[]>`). I suppose it is possible to upgrade the last class to use `ByteBuffers`, but the result will be similar to `CapacityByteArrayOutputStream`.
    
    As I understand, data buffering allows to write into the disk entire block without going back and writing anything else, ideally one row group per one fs block. Looks like it is suitable for HDFS as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119223453
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    --- End diff --
    
    Compared this file to the Parquet original (thanks for providing the file name) using [this tool](https://www.diffchecker.com/diff), referencing the diagram [here](https://parquet.apache.org/documentation/latest/). The changes made in the Drill copy seem reasonable.
    
    I do have questions, however, about the approach to writing. (See below.) Seems overly memory intensive. But, this is an issue with the Parquet original, not about this "port" of the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119835458
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    To avoid of frequent I/O writes to the disk, parquet library buffers an every page as `OutputStream` and writes the data only when `ColumnWriteStore` size is reached the block size [[link](https://github.com/apache/drill/blob/874bf6296dcd1a42c7cf7f097c1a6b5458010cbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java#L288)]. For large block size value and big number of runs a lot of memory can be used before flushing the data. 
    And using of ByteBuffers can allow to choose the type of memory to use. 
    
    `ColumnChunkPageWriter` used `CapacityByteArrayOutputStream` insted of `ByteArrayOutputStream` and `ConcatenatingByteArrayCollector` before [PARQUET-160 fix](https://github.com/apache/parquet-mr/pull/98/commits/8b54667650873c03ea66721d0f06bfad0b968f19). The fact that `ByteBufferAllocator` was introduced in PARQUET-77, but never used in the `ColumnChunkPageWriter`.
    
    See the explanation of why direct memory is more suitable for buffering than heap space in the next comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119834619
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    --- End diff --
    
    See below my answers regarding to the using of memory by parquet library.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119223935
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    + * It will be no need in this class once PARQUET-1006 is resolved.
    + */
    +public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
    +
    +  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
    +
    +  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = Maps.newHashMap();
    +  private final MessageType schema;
    +
    +  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
    +                                          MessageType schema,
    +                                          int initialSlabSize,
    +                                          int maxCapacityHint,
    +                                          ByteBufferAllocator allocator) {
    +    this.schema = schema;
    +    for (ColumnDescriptor path : schema.getColumns()) {
    +      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator));
    +    }
    +  }
    +
    +  @Override
    +  public PageWriter getPageWriter(ColumnDescriptor path) {
    +    return writers.get(path);
    +  }
    +
    +  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
    +    for (ColumnDescriptor path : schema.getColumns()) {
    +      ColumnChunkPageWriter pageWriter = writers.get(path);
    +      pageWriter.writeToFileWriter(writer);
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    for (ColumnChunkPageWriter pageWriter : writers.values()) {
    +      pageWriter.close();
    +    }
    +  }
    +
    +  private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
    +
    +    private final ColumnDescriptor path;
    +    private final BytesCompressor compressor;
    +
    +    private final CapacityByteArrayOutputStream buf;
    +    private DictionaryPage dictionaryPage;
    +
    +    private long uncompressedLength;
    +    private long compressedLength;
    +    private long totalValueCount;
    +    private int pageCount;
    +
    +    // repetition and definition level encodings are used only for v1 pages and don't change
    +    private Set<Encoding> rlEncodings = Sets.newHashSet();
    +    private Set<Encoding> dlEncodings = Sets.newHashSet();
    +    private List<Encoding> dataEncodings = Lists.newArrayList();
    +
    +    private Statistics totalStatistics;
    +
    +    private ColumnChunkPageWriter(ColumnDescriptor path,
    +                                  BytesCompressor compressor,
    +                                  int initialSlabSize,
    +                                  int maxCapacityHint,
    +                                  ByteBufferAllocator allocator) {
    +      this.path = path;
    +      this.compressor = compressor;
    +      this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
    +      this.totalStatistics = getStatsBasedOnType(this.path.getType());
    +    }
    +
    +    @Override
    +    public void writePage(BytesInput bytes,
    +                          int valueCount,
    +                          Statistics statistics,
    +                          Encoding rlEncoding,
    +                          Encoding dlEncoding,
    +                          Encoding valuesEncoding) throws IOException {
    +      long uncompressedSize = bytes.size();
    +      if (uncompressedSize > Integer.MAX_VALUE) {
    --- End diff --
    
    If the uncompressed size is anywhere near `Integer.MAX_VALUE` (2 GB), then we have a resource problem. Double that for the compressed version and we're talking about serious memory usage. And, this is per column.
    
    What is needed to ensure that each page (see [this diagram](https://parquet.apache.org/documentation/latest/)) is kept to a reasonable size? Does Drill already chose a reasonable size? Is it dependent on the (widely varying) input batch size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/846
  
    Another critical issue here. Direct memory is not a limitless resource, unfortunately. Allocating memory larger than 16 MB causes memory fragmentation as the allocation must come from the system, but all free memory is cached in Netty in 16 MB chunks. So, if the buffer size here is larger than 16 MB, the operation may fail with an OOM.
    
    If this operation must buffer in memory, and must use larger than 16 MB, then you need something like what the original code provided. That buffered writer can be backed by a chain of 16 MB direct memory blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119833975
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---
    @@ -268,8 +278,7 @@ private void flush() throws IOException {
         }
     
         store.close();
    -    // TODO(jaltekruse) - review this close method should no longer be necessary
    -//    ColumnChunkPageWriteStoreExposer.close(pageStore);
    +    pageStore.close();
    --- End diff --
    
    Agree. Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on the issue:

    https://github.com/apache/drill/pull/846
  
    @paul-rogers As I mentioned in my previous comment the page size can't greatly exceed 1Mb (default value of page-size option in Drill). And I checked it -- almost every time the page size is much less than 1 MB.
    
    The data, which are buffered - all pages within one row group. And when buffered data exceeds the block-size then the row group will be written to the disk and flushed from the stream buffer.
    Which is what the current code does.
    
    I compared of creating a large parquet tables with current Drill master version and version of Drill with my fix and received the same performance. Also I found the same time of passing the Drill's tests
    
    The branch is rebased to the last Drill master version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119180018
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---
    @@ -268,8 +278,7 @@ private void flush() throws IOException {
         }
     
         store.close();
    -    // TODO(jaltekruse) - review this close method should no longer be necessary
    -//    ColumnChunkPageWriteStoreExposer.close(pageStore);
    +    pageStore.close();
    --- End diff --
    
    This is existing code; but seems that the above code should be in a try block, with close in the finally, to ensure that the close occurs even if a write or other error occurs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119836892
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    + * It will be no need in this class once PARQUET-1006 is resolved.
    + */
    +public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
    +
    +  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
    +
    +  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = Maps.newHashMap();
    +  private final MessageType schema;
    +
    +  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
    +                                          MessageType schema,
    +                                          int initialSlabSize,
    +                                          int maxCapacityHint,
    +                                          ByteBufferAllocator allocator) {
    +    this.schema = schema;
    +    for (ColumnDescriptor path : schema.getColumns()) {
    +      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator));
    +    }
    +  }
    +
    +  @Override
    +  public PageWriter getPageWriter(ColumnDescriptor path) {
    +    return writers.get(path);
    +  }
    +
    +  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
    +    for (ColumnDescriptor path : schema.getColumns()) {
    +      ColumnChunkPageWriter pageWriter = writers.get(path);
    +      pageWriter.writeToFileWriter(writer);
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    for (ColumnChunkPageWriter pageWriter : writers.values()) {
    +      pageWriter.close();
    +    }
    +  }
    +
    +  private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
    +
    +    private final ColumnDescriptor path;
    +    private final BytesCompressor compressor;
    +
    +    private final CapacityByteArrayOutputStream buf;
    +    private DictionaryPage dictionaryPage;
    +
    +    private long uncompressedLength;
    +    private long compressedLength;
    +    private long totalValueCount;
    +    private int pageCount;
    +
    +    // repetition and definition level encodings are used only for v1 pages and don't change
    +    private Set<Encoding> rlEncodings = Sets.newHashSet();
    +    private Set<Encoding> dlEncodings = Sets.newHashSet();
    +    private List<Encoding> dataEncodings = Lists.newArrayList();
    +
    +    private Statistics totalStatistics;
    +
    +    private ColumnChunkPageWriter(ColumnDescriptor path,
    +                                  BytesCompressor compressor,
    +                                  int initialSlabSize,
    +                                  int maxCapacityHint,
    +                                  ByteBufferAllocator allocator) {
    +      this.path = path;
    +      this.compressor = compressor;
    +      this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
    +      this.totalStatistics = getStatsBasedOnType(this.path.getType());
    +    }
    +
    +    @Override
    +    public void writePage(BytesInput bytes,
    +                          int valueCount,
    +                          Statistics statistics,
    +                          Encoding rlEncoding,
    +                          Encoding dlEncoding,
    +                          Encoding valuesEncoding) throws IOException {
    +      long uncompressedSize = bytes.size();
    +      if (uncompressedSize > Integer.MAX_VALUE) {
    --- End diff --
    
    We should throw an exception here, because parquet creates bad metadata if the uncompressed or compressed size of a page exceeds Integer.MAX_VALUE.
    But with Drill we can't get there with default options for parquet (PARQUET_BLOCK_SIZE, PARQUET_PAGE_SIZE, MINIMUM_RECORD_COUNT_FOR_CHECK).
    The page size is controlled by the method [ColumnWriterV1.accountForValueWritten()](https://github.com/apache/parquet-mr/blob/70f28810a5547219e18ffc3465f519c454fee6e5/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java#L95). According to it once page size is bigger than the page size option value (1MB - default in Drill) the page will be written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r119181468
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    Please explain, perhaps in the JIRA or in a code comment, the benefit of using a ByteBuffer here. Here is my question. Data comes from value vectors, where it is stored as direct memory. But, individual values have to be retrieved from the vector and converted to some other form: perhaps an int, a double or however Parquet represents strings.
    
    Parquet will then work its magic on the values: compressing them, dictionary encoding them, etc.
    
    Finally, the resulting buffers are written to disk. If I/O is synchronous, data comes from a heap buffer. If async, then from direct.
    
    What is the benefit of introducing ByteBuffer allocators here? Are we doing that to use direct memory? What is the advantage of copying Parquet's likely heap buffers into direct memory for buffering prior to writing to a file?
    
    Said another way, why can't we just use the original Parquet version of this code?
    
    Some bit of explanation would be very helpful for us reviewers...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #846: DRILL-5544: Out of heap running CTAS against text d...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/846#discussion_r120453820
  
    --- Diff: exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.parquet.hadoop;
    +
    +import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
    +import org.apache.parquet.bytes.BytesInput;
    +import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.column.Encoding;
    +import org.apache.parquet.column.page.DictionaryPage;
    +import org.apache.parquet.column.page.PageWriteStore;
    +import org.apache.parquet.column.page.PageWriter;
    +import org.apache.parquet.column.statistics.Statistics;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
    +import org.apache.parquet.io.ParquetEncodingException;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.bytes.ByteBufferAllocator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
    + * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
    --- End diff --
    
    I'm not sure that 20 GB is "OK" to write to a file: whether on the heap or otherwise. But, if we accept the poor design that needs this much, then I agree it is better to have uncontrolled use of direct memory than the heap. But, in either case, uncontrolled memory use is a very bad idea.
    
    Note that the only buffering needed is 4K: the disk block size. Experiments with spilling show that there is zero benefit to buffering above 32K.
    
    So, for 10 runs * 32K = 320K. This is FAR less than the 20 GB proposed, and plenty for heap, to avoid the cost of copying data into, then out of, direct memory.
    
    Further, what is wrong with Java's own `BufferedOutputStream`? Was this tried? It works well for simple cases, not sure about more complex cases.
    
    Is the problem that Parquet must somehow buffer an entire page so it can "backpatch" the first bytes after writing all the rest? If so, that part of Parquet design is incompatible with a write-once file system such as HDFS: it forces unnecessary load on the memory system. (Note, however, than an updatable file system, such as Linux or MFS, would allow us to write data to disk, then go back and write the header, without buffering.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #846: DRILL-5544: Out of heap running CTAS against text delimite...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/846
  
    Given that the page sizes are small (~ 1MB), then the solution simply transfers a modest amount of memory from heap to direct. So, LGTM.
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---