You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Rares Vernica <rv...@gmail.com> on 2018/04/17 01:52:32 UTC

C++ RecordBatchWriter/ReadRecordBatch clarification

Hi,

I'm writing a batch of records to a stream and I want to read them later. I
notice that if I use the RecordBatchStreamWriter class to write them and
then ReadRecordBatch function to read them, I get a Segmentation Fault.

On the other hand, if I use the RecordBatchFileWriter class to write them,
the reading works fine.

So, is the arrow::ipc::ReadRecordBatch function intended to only work if
the records were written by RecordBatchFileWriter?

Below is a complete example, showing the two cases. I tried this on Ubuntu
Trusty with Arrow 0.9.0-1

Thanks!
Rares





// g++-4.9 -ggdb -std=c++11 foo.cpp -larrow

#include <arrow/builder.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>
#include <arrow/ipc/writer.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

int main()
{
    arrow::MemoryPool* pool = arrow::default_memory_pool();

    std::shared_ptr<arrow::PoolBuffer> buffer(new arrow::PoolBuffer(pool));

    arrow::Int64Builder builder(pool);
    builder.Append(1);

    std::shared_ptr<arrow::Array> array;
    builder.Finish(&array);

    std::vector<std::shared_ptr<arrow::Field>> schema_vector =
        {arrow::field("id", arrow::int64())};

    auto schema = std::make_shared<arrow::Schema>(schema_vector);


    // Write
    std::shared_ptr<arrow::RecordBatch> batchOut;
    batchOut = arrow::RecordBatch::Make(schema, 10, {array});

    std::unique_ptr<arrow::io::BufferOutputStream> stream;
    stream.reset(new arrow::io::BufferOutputStream(buffer));

    std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;

    // #1 - Segmentation fault (core dumped)
    arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
&writer);

    // #2 - OK
    //arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
&writer);

    writer->WriteRecordBatch(*batchOut);

    writer->Close();
    stream->Close();


    // Read
    arrow::io::BufferReader reader(buffer);
    std::shared_ptr<arrow::RecordBatch> batchIn;
    arrow::ipc::ReadRecordBatch(schema, &reader, &batchIn);
}

Re: C++ RecordBatchWriter/ReadRecordBatch clarification

Posted by Wes McKinney <we...@gmail.com>.
> So, I guess the ReadRecordBatch function is intended to only work if the records were written by RecordBatchFileWriter, right?

The *StreamWriter and *FileWriter classes use identical code paths for
writing the IPC messages, the only difference is the preamble (the
magic number and padding) and the file footer written at the end, see

https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L919

https://github.com/apache/arrow/blob/master/format/IPC.md#file-format

I'm looking at your code in
https://github.com/Paradigm4/accelerated_io_tools/blob/master/src/PhysicalAioSave.cpp#L1547
 -- it is not going to work because you wrote a stream that includes
the schema as the first message. If you use
arrow::ipc::WriteRecordBatch instead, then things will work fine.
Another option is to use the generic ReadMessage function twice,
skipping the schema message if you don't need it

Hope this helps
Wes

On Sun, Apr 22, 2018 at 2:33 PM, Rares Vernica <rv...@gmail.com> wrote:
> Hi Dimitri,
>
> Thanks for that. I was going something like that. My hope was to use
> StreamWriter to write the batch and then use ReadRecordBatch to read it
> since it is more succinct and I know I only have one batch to read.
>
> Here is the actual code I use
> https://github.com/Paradigm4/accelerated_io_tools/blob/master/src/PhysicalAioSave.cpp#L1547
> and above it, commended out, is the code I would like to use.
>
> So, I guess the ReadRecordBatch function is intended to only work if the
> records were written by RecordBatchFileWriter, right?
>
> Cheers,
> Rares
>
>
> On Tue, Apr 17, 2018 at 1:00 AM, Dimitri Vorona <al...@googlemail.com>
> wrote:
>
>> Hi Rares,
>>
>> you use a different reader for the RecordBatch streams. See
>> arrow/ipc/ipc-read-write-test.cc:569-596 for the gist. Also, the second
>> argument to arrow::RecordBatch::Make takes the number of rows in the batch,
>> so you have  to set it to 1 in your example.
>>
>> See https://gist.github.com/alendit/c6cdd1adaf7007786392731152d3b6b9
>>
>> Cheers,
>> Dimitri.
>>
>> On Tue, Apr 17, 2018 at 3:52 AM, Rares Vernica <rv...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I'm writing a batch of records to a stream and I want to read them
>> later. I
>> > notice that if I use the RecordBatchStreamWriter class to write them and
>> > then ReadRecordBatch function to read them, I get a Segmentation Fault.
>> >
>> > On the other hand, if I use the RecordBatchFileWriter class to write
>> them,
>> > the reading works fine.
>> >
>> > So, is the arrow::ipc::ReadRecordBatch function intended to only work if
>> > the records were written by RecordBatchFileWriter?
>> >
>> > Below is a complete example, showing the two cases. I tried this on
>> Ubuntu
>> > Trusty with Arrow 0.9.0-1
>> >
>> > Thanks!
>> > Rares
>> >
>> >
>> >
>> >
>> >
>> > // g++-4.9 -ggdb -std=c++11 foo.cpp -larrow
>> >
>> > #include <arrow/builder.h>
>> > #include <arrow/io/memory.h>
>> > #include <arrow/ipc/reader.h>
>> > #include <arrow/ipc/writer.h>
>> > #include <arrow/memory_pool.h>
>> > #include <arrow/record_batch.h>
>> > #include <arrow/type.h>
>> >
>> > int main()
>> > {
>> >     arrow::MemoryPool* pool = arrow::default_memory_pool();
>> >
>> >     std::shared_ptr<arrow::PoolBuffer> buffer(new
>> > arrow::PoolBuffer(pool));
>> >
>> >     arrow::Int64Builder builder(pool);
>> >     builder.Append(1);
>> >
>> >     std::shared_ptr<arrow::Array> array;
>> >     builder.Finish(&array);
>> >
>> >     std::vector<std::shared_ptr<arrow::Field>> schema_vector =
>> >         {arrow::field("id", arrow::int64())};
>> >
>> >     auto schema = std::make_shared<arrow::Schema>(schema_vector);
>> >
>> >
>> >     // Write
>> >     std::shared_ptr<arrow::RecordBatch> batchOut;
>> >     batchOut = arrow::RecordBatch::Make(schema, 10, {array});
>> >
>> >     std::unique_ptr<arrow::io::BufferOutputStream> stream;
>> >     stream.reset(new arrow::io::BufferOutputStream(buffer));
>> >
>> >     std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
>> >
>> >     // #1 - Segmentation fault (core dumped)
>> >     arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
>> > &writer);
>> >
>> >     // #2 - OK
>> >     //arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
>> > &writer);
>> >
>> >     writer->WriteRecordBatch(*batchOut);
>> >
>> >     writer->Close();
>> >     stream->Close();
>> >
>> >
>> >     // Read
>> >     arrow::io::BufferReader reader(buffer);
>> >     std::shared_ptr<arrow::RecordBatch> batchIn;
>> >     arrow::ipc::ReadRecordBatch(schema, &reader, &batchIn);
>> > }
>> >
>>

Re: C++ RecordBatchWriter/ReadRecordBatch clarification

Posted by Rares Vernica <rv...@gmail.com>.
Hi Dimitri,

Thanks for that. I was going something like that. My hope was to use
StreamWriter to write the batch and then use ReadRecordBatch to read it
since it is more succinct and I know I only have one batch to read.

Here is the actual code I use
https://github.com/Paradigm4/accelerated_io_tools/blob/master/src/PhysicalAioSave.cpp#L1547
and above it, commended out, is the code I would like to use.

So, I guess the ReadRecordBatch function is intended to only work if the
records were written by RecordBatchFileWriter, right?

Cheers,
Rares


On Tue, Apr 17, 2018 at 1:00 AM, Dimitri Vorona <al...@googlemail.com>
wrote:

> Hi Rares,
>
> you use a different reader for the RecordBatch streams. See
> arrow/ipc/ipc-read-write-test.cc:569-596 for the gist. Also, the second
> argument to arrow::RecordBatch::Make takes the number of rows in the batch,
> so you have  to set it to 1 in your example.
>
> See https://gist.github.com/alendit/c6cdd1adaf7007786392731152d3b6b9
>
> Cheers,
> Dimitri.
>
> On Tue, Apr 17, 2018 at 3:52 AM, Rares Vernica <rv...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm writing a batch of records to a stream and I want to read them
> later. I
> > notice that if I use the RecordBatchStreamWriter class to write them and
> > then ReadRecordBatch function to read them, I get a Segmentation Fault.
> >
> > On the other hand, if I use the RecordBatchFileWriter class to write
> them,
> > the reading works fine.
> >
> > So, is the arrow::ipc::ReadRecordBatch function intended to only work if
> > the records were written by RecordBatchFileWriter?
> >
> > Below is a complete example, showing the two cases. I tried this on
> Ubuntu
> > Trusty with Arrow 0.9.0-1
> >
> > Thanks!
> > Rares
> >
> >
> >
> >
> >
> > // g++-4.9 -ggdb -std=c++11 foo.cpp -larrow
> >
> > #include <arrow/builder.h>
> > #include <arrow/io/memory.h>
> > #include <arrow/ipc/reader.h>
> > #include <arrow/ipc/writer.h>
> > #include <arrow/memory_pool.h>
> > #include <arrow/record_batch.h>
> > #include <arrow/type.h>
> >
> > int main()
> > {
> >     arrow::MemoryPool* pool = arrow::default_memory_pool();
> >
> >     std::shared_ptr<arrow::PoolBuffer> buffer(new
> > arrow::PoolBuffer(pool));
> >
> >     arrow::Int64Builder builder(pool);
> >     builder.Append(1);
> >
> >     std::shared_ptr<arrow::Array> array;
> >     builder.Finish(&array);
> >
> >     std::vector<std::shared_ptr<arrow::Field>> schema_vector =
> >         {arrow::field("id", arrow::int64())};
> >
> >     auto schema = std::make_shared<arrow::Schema>(schema_vector);
> >
> >
> >     // Write
> >     std::shared_ptr<arrow::RecordBatch> batchOut;
> >     batchOut = arrow::RecordBatch::Make(schema, 10, {array});
> >
> >     std::unique_ptr<arrow::io::BufferOutputStream> stream;
> >     stream.reset(new arrow::io::BufferOutputStream(buffer));
> >
> >     std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
> >
> >     // #1 - Segmentation fault (core dumped)
> >     arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
> > &writer);
> >
> >     // #2 - OK
> >     //arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
> > &writer);
> >
> >     writer->WriteRecordBatch(*batchOut);
> >
> >     writer->Close();
> >     stream->Close();
> >
> >
> >     // Read
> >     arrow::io::BufferReader reader(buffer);
> >     std::shared_ptr<arrow::RecordBatch> batchIn;
> >     arrow::ipc::ReadRecordBatch(schema, &reader, &batchIn);
> > }
> >
>

Re: C++ RecordBatchWriter/ReadRecordBatch clarification

Posted by Dimitri Vorona <al...@googlemail.com>.
Hi Rares,

you use a different reader for the RecordBatch streams. See
arrow/ipc/ipc-read-write-test.cc:569-596 for the gist. Also, the second
argument to arrow::RecordBatch::Make takes the number of rows in the batch,
so you have  to set it to 1 in your example.

See https://gist.github.com/alendit/c6cdd1adaf7007786392731152d3b6b9

Cheers,
Dimitri.

On Tue, Apr 17, 2018 at 3:52 AM, Rares Vernica <rv...@gmail.com> wrote:

> Hi,
>
> I'm writing a batch of records to a stream and I want to read them later. I
> notice that if I use the RecordBatchStreamWriter class to write them and
> then ReadRecordBatch function to read them, I get a Segmentation Fault.
>
> On the other hand, if I use the RecordBatchFileWriter class to write them,
> the reading works fine.
>
> So, is the arrow::ipc::ReadRecordBatch function intended to only work if
> the records were written by RecordBatchFileWriter?
>
> Below is a complete example, showing the two cases. I tried this on Ubuntu
> Trusty with Arrow 0.9.0-1
>
> Thanks!
> Rares
>
>
>
>
>
> // g++-4.9 -ggdb -std=c++11 foo.cpp -larrow
>
> #include <arrow/builder.h>
> #include <arrow/io/memory.h>
> #include <arrow/ipc/reader.h>
> #include <arrow/ipc/writer.h>
> #include <arrow/memory_pool.h>
> #include <arrow/record_batch.h>
> #include <arrow/type.h>
>
> int main()
> {
>     arrow::MemoryPool* pool = arrow::default_memory_pool();
>
>     std::shared_ptr<arrow::PoolBuffer> buffer(new
> arrow::PoolBuffer(pool));
>
>     arrow::Int64Builder builder(pool);
>     builder.Append(1);
>
>     std::shared_ptr<arrow::Array> array;
>     builder.Finish(&array);
>
>     std::vector<std::shared_ptr<arrow::Field>> schema_vector =
>         {arrow::field("id", arrow::int64())};
>
>     auto schema = std::make_shared<arrow::Schema>(schema_vector);
>
>
>     // Write
>     std::shared_ptr<arrow::RecordBatch> batchOut;
>     batchOut = arrow::RecordBatch::Make(schema, 10, {array});
>
>     std::unique_ptr<arrow::io::BufferOutputStream> stream;
>     stream.reset(new arrow::io::BufferOutputStream(buffer));
>
>     std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
>
>     // #1 - Segmentation fault (core dumped)
>     arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
> &writer);
>
>     // #2 - OK
>     //arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
> &writer);
>
>     writer->WriteRecordBatch(*batchOut);
>
>     writer->Close();
>     stream->Close();
>
>
>     // Read
>     arrow::io::BufferReader reader(buffer);
>     std::shared_ptr<arrow::RecordBatch> batchIn;
>     arrow::ipc::ReadRecordBatch(schema, &reader, &batchIn);
> }
>