You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by L Ait <lh...@gmail.com> on 2022/07/02 14:04:45 UTC

StreamReader

Hi,

I need help to integrate arrow cpp in my current project. In fact I built
cpp library and can call api.

What I need is that:

I have a c++ project that reads data by chunks then uses some erasure code
to rebuild original data.

The rebuild is done in chunks , At each iteration I can access a buffer of
rebuilt data.

My need is to pass this data as a stream to arrow process then send the
processed stream.

For example if my original file is a csv and I would like to filter and
save first column:

file

col1,col2, col3, col3
a1,b1,c1,d1
an,bn,cn,dn

split to 6 chunks of equal sizes chunk1:

a1,b1,c1,d1
ak,bk

chunk2:

ck,dk
...
am,bm,cm,dm

and so on.

My question is how to use the right StreamReader  in arrow and how this
deals with in complete records( lines)  at the beginning and end of each
chunk ?

Here a snippet of code I use :
buffer_type_t res = fut.get0();
BOOST_LOG_TRIVIAL(trace) <<
"RawxBackendReader: Got result with buffer size: " << res.size();
std::shared_ptr<arrow::io::InputStream> input;

std::shared_ptr<arrow::io::BufferReader> buffer(new arrow::io::BufferReader(
reinterpret_cast<const uint8_t*>(res.get()), res.size()));
input = buffer;
BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();

ArrowFilter arrow_filter = ArrowFilter(input);
arrow_filter.ToCsv();


result.push_back(std::move(res));

Thank you

Re: StreamReader

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

I can't understand why you want to mix dev@ and user@
mailing lists... Anyway...


Sorry. I misunderstood. I thought that your input is Apache
Arrow format and your output is CSV. You can't use
arrow::ipc::RecordBatchStreamReader for CSV. You need to use
arrow::csv::StreamReader:

  buffer_type_t res = fut.get0();
  BOOST_LOG_TRIVIAL(trace) <<
    "RawxBackendReader: Got result with buffer size: " << res.size();
  auto input = std::make_shared<arrow::io::BufferReader>(
    reinterpret_cast<const uint8_t*>(res.get()),
    res.size());
  BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
  auto io_context = arrow::io::IOContext(arrow::default_memory_pool());
  auto read_options = arrow::csv::ReadOptions::Defaults();
  auto parse_options = arrow::csv::ParseOptions::Defaults();
  auto convert_options = arrow::csv::ConvertOptions::Defaults();
  auto reader_result =
    arrow::csv::StreamReader::Make(io_context,
                                   input,
                                   read_options,
                                   parse_options,
                                   convert_options);
  if (reader_result.ok()) {
    exit(1);
  }
  auto reader = *reader_result;
  for (auto record_batch_result : *reader) {
    if (!record_batch_result.ok()) {
      exit(1);
    }
    auto *record_batch = record_batch_result;
    // Filter record_batch and write CSV.
    // You can use arrow::csv::MakeCSVWriter() to write a CSV.
  }

  result.push_back(std::move(res));


Thanks,
-- 
kou

In <CA...@mail.gmail.com>
  "Re: StreamReader" on Mon, 18 Jul 2022 10:31:08 +0200,
  L Ait <lh...@gmail.com> wrote:

> Hey,
> 
> I tested the suggestion here and by adapting code to read  stream from csv
> format.
> But in my y tests the method OnRecordBatchDecoded is never called and my
> understanding is that this waits for an ipc format
> while I am reading csv format?
> 
> I am missing something?
> 
> In the meantime in order to replay to this thread, I only need to replay to
> dev@arrow.apache.org ?
> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
> on dev@ to connect the existing thread?
> 
> lass MyListener : public arrow::ipc::Listener {
>   public:
>     arrow::Status
>     OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
>     override {
>       ArrowFilter arrow_filter = ArrowFilter(record_batch);
>        arrow_filter.ToCsv();
>     }
>   }
> 
> 
> Thanks
> 
> 
> Le mer. 13 juil. 2022 à 06:50, Sutou Kouhei <ko...@clear-code.com> a écrit :
> 
>> Could you resend your reply to
>> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>> on dev@ to connect the existing thread?
>>
>> In <CA...@mail.gmail.com>
>>   "Re: StreamReader" on Tue, 12 Jul 2022 10:01:00 +0200,
>>   L Ait <lh...@gmail.com> wrote:
>>
>> > Thank you, I will look on that,
>> > The real problem is that I read data in chunks and the end of the chunk
>> is
>> > truncated (not a complete line) . I need to wait for the next chunk to
>> have
>> > the line completion.
>> >
>> > Is there a way you suggest to process only the chunks smoothly ?
>> >
>> > Thank you
>> >
>> >
>> > Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <ko...@clear-code.com> a écrit
>> :
>> >
>> >> Answered on dev@:
>> >> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>> >>
>> >> In <CA...@mail.gmail.com>
>> >>   "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
>> >>   L Ait <lh...@gmail.com> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > I need help to integrate arrow cpp in my current project. In fact I
>> built
>> >> > cpp library and can call api.
>> >> >
>> >> > What I need is that:
>> >> >
>> >> > I have a c++ project that reads data by chunks then uses some erasure
>> >> code
>> >> > to rebuild original data.
>> >> >
>> >> > The rebuild is done in chunks , At each iteration I can access a
>> buffer
>> >> of
>> >> > rebuilt data.
>> >> >
>> >> > My need is to pass this data as a stream to arrow process then send
>> the
>> >> > processed stream.
>> >> >
>> >> > For example if my original file is a csv and I would like to filter
>> and
>> >> > save first column:
>> >> >
>> >> > file
>> >> >
>> >> > col1,col2, col3, col3
>> >> > a1,b1,c1,d1
>> >> > an,bn,cn,dn
>> >> >
>> >> > split to 6 chunks of equal sizes chunk1:
>> >> >
>> >> > a1,b1,c1,d1
>> >> > ak,bk
>> >> >
>> >> > chunk2:
>> >> >
>> >> > ck,dk
>> >> > ...
>> >> > am,bm,cm,dm
>> >> >
>> >> > and so on.
>> >> >
>> >> > My question is how to use the right StreamReader  in arrow and how
>> this
>> >> > deals with in complete records( lines)  at the beginning and end of
>> each
>> >> > chunk ?
>> >> >
>> >> > Here a snippet of code I use :
>> >> > buffer_type_t res = fut.get0();
>> >> > BOOST_LOG_TRIVIAL(trace) <<
>> >> > "RawxBackendReader: Got result with buffer size: " << res.size();
>> >> > std::shared_ptr<arrow::io::InputStream> input;
>> >> >
>> >> > std::shared_ptr<arrow::io::BufferReader> buffer(new
>> >> arrow::io::BufferReader(
>> >> > reinterpret_cast<const uint8_t*>(res.get()), res.size()));
>> >> > input = buffer;
>> >> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
>> >> >
>> >> > ArrowFilter arrow_filter = ArrowFilter(input);
>> >> > arrow_filter.ToCsv();
>> >> >
>> >> >
>> >> > result.push_back(std::move(res));
>> >> >
>> >> > Thank you
>> >>
>>

Re: StreamReader

Posted by L Ait <lh...@gmail.com>.
Hey,

I tested the suggestion here and by adapting code to read  stream from csv
format.
But in my y tests the method OnRecordBatchDecoded is never called and my
understanding is that this waits for an ipc format
while I am reading csv format?

I am missing something?

In the meantime in order to replay to this thread, I only need to replay to
dev@arrow.apache.org ?
https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
on dev@ to connect the existing thread?

lass MyListener : public arrow::ipc::Listener {
  public:
    arrow::Status
    OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
    override {
      ArrowFilter arrow_filter = ArrowFilter(record_batch);
       arrow_filter.ToCsv();
    }
  }


Thanks


Le mer. 13 juil. 2022 à 06:50, Sutou Kouhei <ko...@clear-code.com> a écrit :

> Could you resend your reply to
> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
> on dev@ to connect the existing thread?
>
> In <CA...@mail.gmail.com>
>   "Re: StreamReader" on Tue, 12 Jul 2022 10:01:00 +0200,
>   L Ait <lh...@gmail.com> wrote:
>
> > Thank you, I will look on that,
> > The real problem is that I read data in chunks and the end of the chunk
> is
> > truncated (not a complete line) . I need to wait for the next chunk to
> have
> > the line completion.
> >
> > Is there a way you suggest to process only the chunks smoothly ?
> >
> > Thank you
> >
> >
> > Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <ko...@clear-code.com> a écrit
> :
> >
> >> Answered on dev@:
> >> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
> >>
> >> In <CA...@mail.gmail.com>
> >>   "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
> >>   L Ait <lh...@gmail.com> wrote:
> >>
> >> > Hi,
> >> >
> >> > I need help to integrate arrow cpp in my current project. In fact I
> built
> >> > cpp library and can call api.
> >> >
> >> > What I need is that:
> >> >
> >> > I have a c++ project that reads data by chunks then uses some erasure
> >> code
> >> > to rebuild original data.
> >> >
> >> > The rebuild is done in chunks , At each iteration I can access a
> buffer
> >> of
> >> > rebuilt data.
> >> >
> >> > My need is to pass this data as a stream to arrow process then send
> the
> >> > processed stream.
> >> >
> >> > For example if my original file is a csv and I would like to filter
> and
> >> > save first column:
> >> >
> >> > file
> >> >
> >> > col1,col2, col3, col3
> >> > a1,b1,c1,d1
> >> > an,bn,cn,dn
> >> >
> >> > split to 6 chunks of equal sizes chunk1:
> >> >
> >> > a1,b1,c1,d1
> >> > ak,bk
> >> >
> >> > chunk2:
> >> >
> >> > ck,dk
> >> > ...
> >> > am,bm,cm,dm
> >> >
> >> > and so on.
> >> >
> >> > My question is how to use the right StreamReader  in arrow and how
> this
> >> > deals with in complete records( lines)  at the beginning and end of
> each
> >> > chunk ?
> >> >
> >> > Here a snippet of code I use :
> >> > buffer_type_t res = fut.get0();
> >> > BOOST_LOG_TRIVIAL(trace) <<
> >> > "RawxBackendReader: Got result with buffer size: " << res.size();
> >> > std::shared_ptr<arrow::io::InputStream> input;
> >> >
> >> > std::shared_ptr<arrow::io::BufferReader> buffer(new
> >> arrow::io::BufferReader(
> >> > reinterpret_cast<const uint8_t*>(res.get()), res.size()));
> >> > input = buffer;
> >> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
> >> >
> >> > ArrowFilter arrow_filter = ArrowFilter(input);
> >> > arrow_filter.ToCsv();
> >> >
> >> >
> >> > result.push_back(std::move(res));
> >> >
> >> > Thank you
> >>
>

Re: StreamReader

Posted by Sutou Kouhei <ko...@clear-code.com>.
Could you resend your reply to
https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
on dev@ to connect the existing thread?

In <CA...@mail.gmail.com>
  "Re: StreamReader" on Tue, 12 Jul 2022 10:01:00 +0200,
  L Ait <lh...@gmail.com> wrote:

> Thank you, I will look on that,
> The real problem is that I read data in chunks and the end of the chunk is
> truncated (not a complete line) . I need to wait for the next chunk to have
> the line completion.
> 
> Is there a way you suggest to process only the chunks smoothly ?
> 
> Thank you
> 
> 
> Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <ko...@clear-code.com> a écrit :
> 
>> Answered on dev@:
>> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>>
>> In <CA...@mail.gmail.com>
>>   "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
>>   L Ait <lh...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I need help to integrate arrow cpp in my current project. In fact I built
>> > cpp library and can call api.
>> >
>> > What I need is that:
>> >
>> > I have a c++ project that reads data by chunks then uses some erasure
>> code
>> > to rebuild original data.
>> >
>> > The rebuild is done in chunks , At each iteration I can access a buffer
>> of
>> > rebuilt data.
>> >
>> > My need is to pass this data as a stream to arrow process then send the
>> > processed stream.
>> >
>> > For example if my original file is a csv and I would like to filter and
>> > save first column:
>> >
>> > file
>> >
>> > col1,col2, col3, col3
>> > a1,b1,c1,d1
>> > an,bn,cn,dn
>> >
>> > split to 6 chunks of equal sizes chunk1:
>> >
>> > a1,b1,c1,d1
>> > ak,bk
>> >
>> > chunk2:
>> >
>> > ck,dk
>> > ...
>> > am,bm,cm,dm
>> >
>> > and so on.
>> >
>> > My question is how to use the right StreamReader  in arrow and how this
>> > deals with in complete records( lines)  at the beginning and end of each
>> > chunk ?
>> >
>> > Here a snippet of code I use :
>> > buffer_type_t res = fut.get0();
>> > BOOST_LOG_TRIVIAL(trace) <<
>> > "RawxBackendReader: Got result with buffer size: " << res.size();
>> > std::shared_ptr<arrow::io::InputStream> input;
>> >
>> > std::shared_ptr<arrow::io::BufferReader> buffer(new
>> arrow::io::BufferReader(
>> > reinterpret_cast<const uint8_t*>(res.get()), res.size()));
>> > input = buffer;
>> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
>> >
>> > ArrowFilter arrow_filter = ArrowFilter(input);
>> > arrow_filter.ToCsv();
>> >
>> >
>> > result.push_back(std::move(res));
>> >
>> > Thank you
>>

Re: StreamReader

Posted by L Ait <lh...@gmail.com>.
Thank you, I will look on that,
The real problem is that I read data in chunks and the end of the chunk is
truncated (not a complete line) . I need to wait for the next chunk to have
the line completion.

Is there a way you suggest to process only the chunks smoothly ?

Thank you


Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <ko...@clear-code.com> a écrit :

> Answered on dev@:
> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>
> In <CA...@mail.gmail.com>
>   "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
>   L Ait <lh...@gmail.com> wrote:
>
> > Hi,
> >
> > I need help to integrate arrow cpp in my current project. In fact I built
> > cpp library and can call api.
> >
> > What I need is that:
> >
> > I have a c++ project that reads data by chunks then uses some erasure
> code
> > to rebuild original data.
> >
> > The rebuild is done in chunks , At each iteration I can access a buffer
> of
> > rebuilt data.
> >
> > My need is to pass this data as a stream to arrow process then send the
> > processed stream.
> >
> > For example if my original file is a csv and I would like to filter and
> > save first column:
> >
> > file
> >
> > col1,col2, col3, col3
> > a1,b1,c1,d1
> > an,bn,cn,dn
> >
> > split to 6 chunks of equal sizes chunk1:
> >
> > a1,b1,c1,d1
> > ak,bk
> >
> > chunk2:
> >
> > ck,dk
> > ...
> > am,bm,cm,dm
> >
> > and so on.
> >
> > My question is how to use the right StreamReader  in arrow and how this
> > deals with in complete records( lines)  at the beginning and end of each
> > chunk ?
> >
> > Here a snippet of code I use :
> > buffer_type_t res = fut.get0();
> > BOOST_LOG_TRIVIAL(trace) <<
> > "RawxBackendReader: Got result with buffer size: " << res.size();
> > std::shared_ptr<arrow::io::InputStream> input;
> >
> > std::shared_ptr<arrow::io::BufferReader> buffer(new
> arrow::io::BufferReader(
> > reinterpret_cast<const uint8_t*>(res.get()), res.size()));
> > input = buffer;
> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
> >
> > ArrowFilter arrow_filter = ArrowFilter(input);
> > arrow_filter.ToCsv();
> >
> >
> > result.push_back(std::move(res));
> >
> > Thank you
>

Re: StreamReader

Posted by Sutou Kouhei <ko...@clear-code.com>.
Answered on dev@: https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60

In <CA...@mail.gmail.com>
  "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
  L Ait <lh...@gmail.com> wrote:

> Hi,
> 
> I need help to integrate arrow cpp in my current project. In fact I built
> cpp library and can call api.
> 
> What I need is that:
> 
> I have a c++ project that reads data by chunks then uses some erasure code
> to rebuild original data.
> 
> The rebuild is done in chunks , At each iteration I can access a buffer of
> rebuilt data.
> 
> My need is to pass this data as a stream to arrow process then send the
> processed stream.
> 
> For example if my original file is a csv and I would like to filter and
> save first column:
> 
> file
> 
> col1,col2, col3, col3
> a1,b1,c1,d1
> an,bn,cn,dn
> 
> split to 6 chunks of equal sizes chunk1:
> 
> a1,b1,c1,d1
> ak,bk
> 
> chunk2:
> 
> ck,dk
> ...
> am,bm,cm,dm
> 
> and so on.
> 
> My question is how to use the right StreamReader  in arrow and how this
> deals with in complete records( lines)  at the beginning and end of each
> chunk ?
> 
> Here a snippet of code I use :
> buffer_type_t res = fut.get0();
> BOOST_LOG_TRIVIAL(trace) <<
> "RawxBackendReader: Got result with buffer size: " << res.size();
> std::shared_ptr<arrow::io::InputStream> input;
> 
> std::shared_ptr<arrow::io::BufferReader> buffer(new arrow::io::BufferReader(
> reinterpret_cast<const uint8_t*>(res.get()), res.size()));
> input = buffer;
> BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
> 
> ArrowFilter arrow_filter = ArrowFilter(input);
> arrow_filter.ToCsv();
> 
> 
> result.push_back(std::move(res));
> 
> Thank you