You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Will Jones (Jira)" <ji...@apache.org> on 2022/01/13 22:35:00 UTC

[jira] [Commented] (ARROW-14047) [C++] [Parquet] FileReader returns inconsistent results on repeat reads

    [ https://issues.apache.org/jira/browse/ARROW-14047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475812#comment-17475812 ] 

Will Jones commented on ARROW-14047:
------------------------------------

I have yet to be able to reproduce this issue, because I instead am getting segfaults. Unfortunately, these don't seem particularly related to your issue, so I might create separate tickets to look at what I'm encountering and will see if someone else can help me reproduce.

I tried reproducing the issue in Python (hoping for a quicker repro) with the following snippet:

{code:python}
import pyarrow.parquet as pq
path = "writeReadRowGroup.parquet"
reader = pq.ParquetFile(path).reader
tables = [reader.read_all() for _ in range(20)]

all(tables[0].column("recordList") == table.column("recordList") for table in tables)
{code}

But that segfaults with the following traceback:

{code}
_mi_heap_malloc_zero (@_mi_heap_malloc_zero:20)
arrow::BaseMemoryPoolImpl<arrow::(anonymous namespace)::MimallocAllocator>::Allocate(long long, unsigned char**) (@arrow::BaseMemoryPoolImpl<arrow::(anonymous namespace)::MimallocAllocator>::Allocate(long long, unsigned char**):22)
arrow::PoolBuffer::Reserve(long long) (@arrow::PoolBuffer::Reserve(long long):74)
arrow::PoolBuffer::Resize(long long, bool) (@arrow::PoolBuffer::Resize(long long, bool):31)
arrow::AllocateResizableBuffer(long long, arrow::MemoryPool*) (@arrow::AllocateResizableBuffer(long long, arrow::MemoryPool*):21)
parquet::arrow::(anonymous namespace)::StructReader::BuildArray(long long, std::__1::shared_ptr<arrow::ChunkedArray>*) (@parquet::arrow::(anonymous namespace)::StructReader::BuildArray(long long, std::__1::shared_ptr<arrow::ChunkedArray>*):88)
parquet::arrow::(anonymous namespace)::ListReader<int>::BuildArray(long long, std::__1::shared_ptr<arrow::ChunkedArray>*) (@parquet::arrow::(anonymous namespace)::ListReader<int>::BuildArray(long long, std::__1::shared_ptr<arrow::ChunkedArray>*):125)
parquet::arrow::ColumnReaderImpl::NextBatch(long long, std::__1::shared_ptr<arrow::ChunkedArray>*) (@parquet::arrow::ColumnReaderImpl::NextBatch(long long, std::__1::shared_ptr<arrow::ChunkedArray>*):30)
parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadColumn(int, std::__1::vector<int, std::__1::allocator<int> > const&, parquet::arrow::ColumnReader*, std::__1::shared_ptr<arrow::ChunkedArray>*) (@parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadColumn(int, std::__1::vector<int, std::__1::allocator<int> > const&, parquet::arrow::ColumnReader*, std::__1::shared_ptr<arrow::ChunkedArray>*):77)
parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::__1::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::__1::vector<int, std::__1::allocator<int> > const&, std::__1::vector<int, std::__1::allocator<int> > const&, arrow::internal::Executor*)::$_4::operator()(unsigned long, std::__1::shared_ptr<parquet::arrow::ColumnReaderImpl>) const (@parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::__1::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::__1::vector<int, std::__1::allocator<int> > const&, std::__1::vector<int, std::__1::allocator<int> > const&, arrow::internal::Executor*)::$_4::operator()(unsigned long, std::__1::shared_ptr<parquet::arrow::ColumnReaderImpl>) const:19)
arrow::internal::FnOnce<void ()>::FnImpl<std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<std::__1::shared_ptr<arrow::ChunkedArray> >&, parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::__1::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::__1::vector<int, std::__1::allocator<int> > const&, std::__1::vector<int, std::__1::allocator<int> > const&, arrow::internal::Executor*)::$_4&, unsigned long&, std::__1::shared_ptr<parquet::arrow::ColumnReaderImpl> > >::invoke() (@arrow::internal::FnOnce<void ()>::FnImpl<std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<std::__1::shared_ptr<arrow::ChunkedArray> >&, parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::__1::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::__1::vector<int, std::__1::allocator<int> > const&, std::__1::vector<int, std::__1::allocator<int> > const&, arrow::internal::Executor*)::$_4&, unsigned long&, std::__1::shared_ptr<parquet::arrow::ColumnReaderImpl> > >::invoke():31)
void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3> >(void*) (@void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3> >(void*):182)
_pthread_start (@_pthread_start:40)
{code}

In C++, having a different problem:

{code:cpp}
#include<memory>
#include<iostream>
#include<stdexcept>
#include<filesystem>
#include<arrow/api.h>
#include<arrow/io/api.h>
#include<parquet/arrow/reader.h>

using namespace std;
using namespace std::__fs;

arrow::Status inner_main() {
    filesystem::path filePath = "writeReadRowGroup.parquet";
    arrow::MemoryPool *pool = arrow::default_memory_pool();
    std::shared_ptr<arrow::io::ReadableFile> infile;
    PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, pool));
    std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
    auto status = parquet::arrow::OpenFile(infile, pool, &arrow_reader); // segfaults here
    ARROW_RETURN_NOT_OK(status);
    std::shared_ptr<arrow::Schema> readSchema;
    ARROW_RETURN_NOT_OK(arrow_reader->GetSchema(&readSchema));
    std::shared_ptr<arrow::Table> table;
    std::vector<int> indicesToGet;
    ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
    auto recordListCol1 = arrow::Table::Make(arrow::schema({table->schema()->GetFieldByName("recordList")}),
                                             {table->GetColumnByName("recordList")});
    for (int i = 0; i < 20; ++i)
    {
        cout << "data reread operation number = " + std::to_string(i) << endl;
        std::shared_ptr<arrow::Table> table2;
        ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table2));
        auto recordListCol2 = arrow::Table::Make(arrow::schema({table2->schema()->GetFieldByName("recordList")}),
                                                 {table2->GetColumnByName("recordList")});
        bool equals = recordListCol1->Equals(*recordListCol2);
        if (!equals)
        {
            cout << recordListCol1->ToString() << endl;
            cout << endl
                 << "new table" << endl;
            cout << recordListCol2->ToString() << endl;
            throw std::runtime_error("Subsequent re-read failure ");
        }
    }

    return arrow::Status::OK();
}

int main()
{
    auto status = inner_main();
    return status.ok() ? 0 : 1;
}
{code}

Call stack from C++ 

{code}
__pthread_kill (@__pthread_kill:5)
pthread_kill (@pthread_kill:75)
abort (@abort:44)
malloc_vreport (@has_default_zone0:3)
malloc_report (@malloc_report:19)
free (@free:128)
apache::thrift::transport::TMemoryBuffer::~TMemoryBuffer() (@apache::thrift::transport::TMemoryBuffer::~TMemoryBuffer():14)
void parquet::DeserializeThriftUnencryptedMsg<parquet::format::FileMetaData>(unsigned char const*, unsigned int*, parquet::format::FileMetaData*) (@void parquet::DeserializeThriftUnencryptedMsg<parquet::format::FileMetaData>(unsigned char const*, unsigned int*, parquet::format::FileMetaData*):104)
parquet::FileMetaData::FileMetaDataImpl::FileMetaDataImpl(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>) (@parquet::FileMetaData::FileMetaDataImpl::FileMetaDataImpl(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>):100)
parquet::FileMetaData::FileMetaData(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>) (@parquet::FileMetaData::FileMetaData(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>):27)
parquet::FileMetaData::Make(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>) (@parquet::FileMetaData::Make(void const*, unsigned int*, std::__1::shared_ptr<parquet::InternalFileDecryptor>):27)
parquet::SerializedFile::ParseUnencryptedFileMetadata(std::__1::shared_ptr<arrow::Buffer> const&, unsigned int) (@parquet::SerializedFile::ParseUnencryptedFileMetadata(std::__1::shared_ptr<arrow::Buffer> const&, unsigned int):31)
parquet::SerializedFile::ParseMetaData() (@parquet::SerializedFile::ParseMetaData():217)
parquet::ParquetFileReader::Contents::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>) (@parquet::ParquetFileReader::Contents::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>):56)
parquet::ParquetFileReader::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>) (@parquet::ParquetFileReader::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>):18)
parquet::arrow::FileReaderBuilder::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>) (@parquet::arrow::FileReaderBuilder::Open(std::__1::shared_ptr<arrow::io::RandomAccessFile>, parquet::ReaderProperties const&, std::__1::shared_ptr<parquet::FileMetaData>):22)
parquet::arrow::OpenFile(std::__1::shared_ptr<arrow::io::RandomAccessFile>, arrow::MemoryPool*, std::__1::unique_ptr<parquet::arrow::FileReader, std::__1::default_delete<parquet::arrow::FileReader> >*) (@parquet::arrow::OpenFile(std::__1::shared_ptr<arrow::io::RandomAccessFile>, arrow::MemoryPool*, std::__1::unique_ptr<parquet::arrow::FileReader, std::__1::default_delete<parquet::arrow::FileReader> >*):32)
inner_main() (/Users/willjones/Documents/arrow_cpp_example/example.cc:18)
main (/Users/willjones/Documents/arrow_cpp_example/example.cc:50)
start (@start:133)
{code}


> [C++] [Parquet] FileReader returns inconsistent results on repeat reads
> -----------------------------------------------------------------------
>
>                 Key: ARROW-14047
>                 URL: https://issues.apache.org/jira/browse/ARROW-14047
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>    Affects Versions: 5.0.0
>         Environment: Centos 7 gcc 9.2.0
>            Reporter: Radu Teodorescu
>            Assignee: Will Jones
>            Priority: Major
>         Attachments: Capture.PNG, writeReadRowGroup.parquet
>
>
> We are seeing that for certain data sets when dealing with lists of structs, repeated reads yield different results - I have a file that exhibits this behavior and below is the code for reproducing it:
> {code:java}
>   filesystem::path filePath = dirPath / "writeReadRowGroup.parquet";
>   arrow::MemoryPool *pool = arrow::default_memory_pool();  std::shared_ptr<arrow::io::ReadableFile> infile;
>   PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, pool));
>   std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
>   auto status = parquet::arrow::OpenFile(infile, pool, &arrow_reader);
>   CHECK_OK(status);  std::shared_ptr<arrow::Schema> readSchema;
>   CHECK_OK(arrow_reader->GetSchema(&readSchema));
>   std::shared_ptr<arrow::Table> table;
>   std::vector<int> indicesToGet;
>   CHECK_OK(arrow_reader->ReadTable(&table));  auto recordListCol1 = arrow::Table::Make(arrow::schema({table->schema()->GetFieldByName("recordList")}),
>                                            {table->GetColumnByName("recordList")});  for (int i = 0; i < 20; ++i) {
>     cout << "data reread operation number = " + std::to_string(i) << endl;
>     std::shared_ptr<arrow::Table> table2;
>     CHECK_OK(arrow_reader->ReadTable(&table2));
>     auto recordListCol2 = arrow::Table::Make(arrow::schema({table2->schema()->GetFieldByName("recordList")}),
>                                              {table2->GetColumnByName("recordList")});
>     bool equals = recordListCol1->Equals(*recordListCol2);
>     if (!equals) {
>       cout << recordListCol1->ToString() << endl;
>       cout << endl << "new table" << endl;
>       cout << recordListCol2->ToString() << endl;
>       throw std::runtime_error("Subsequent re-read failure ");
>     }  }
> {code}
> Apparently, as shown in the attached capture the state machine used to track nulls is broken on subsequent usage
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)