You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Rares Vernica <rv...@gmail.com> on 2018/02/26 15:22:58 UTC

C++ optimize stream output

Hello,

I am using the C++ API to serialize and centralize data over the network. I
am wondering if I am using the API in an efficient way.

I have multiple nodes and a coordinator communicating over the network. I
do not have fine control over the network communication. Individual nodes
write one chunk of data to the network. The coordinator will receive all
the chunks and can loop over them.

On each node I do the following (the code is here
<https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L512>
):

   1. Append data to Builders
   2. Finish Builders and get Arrays
   3. Create Record Batch from Arrays
   4. Create Pool Buffer
   5. Create Buffer Output Stream using Pool Buffer
   6. Open Record Batch Stream Writer using Buffer Output Stream
   7. Write Record Batch to writer
   8. Write Buffer data to network

On the coordinator I do the following (the code is here
<https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1000>
):

   1. Open File Output Stream
   2. Open Record Batch Stream Writer using File Output Stream
   3. For each chunk retrieved from the network
      1. Create Buffer Reader using data retrieved from the network (the
      code is here
      <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1054>
      )
      2. Create Record Batch Stream Reader using Buffer Reader and read
      Record Batch (I plan to use ReadRecordBatch, but I'm having
issues like in
      ARROW-2189)
      3. Write Record Batch

A few questions:

   - Does this look good decent? Could the API be used in more efficient
   ways in order to achieve the same goal?
   - On each node side, do steps #3 and #7 copy data?
   - On the coordinator side, do steps #3.2, and #3.3 copy data?
   - On the coordinator side, do I really need to read and write a record
   batch? Could I copy the buffer directly somehow?

Thank you so much!
Rares



(Could the same Pool Buffer be reused across calls?)

Re: C++ optimize stream output

Posted by Rares Vernica <rv...@gmail.com>.
Hi Wes,

Thanks so much for the feedback.

On Thu, Mar 15, 2018 at 5:38 PM, Wes McKinney <we...@gmail.com> wrote:
>
> Per ARROW-2189 -- do you get these problems if you build from source?
> I'm trying to understand if this is a packaging issue or a code issue

I plan to try building from source. I will update the issue if I have any
news.

> On Mon, Feb 26, 2018 at 10:22 AM, Rares Vernica <rv...@gmail.com>
wrote:
>
>> - On the coordinator side, do I really need to read and write a record
>> batch? Could I copy the buffer directly somehow?
>
> No, you don't need to necessarily. The idea of the Message* classes in
> arrow::ipc is to facilitate transporting messages while being agnostic
> to their comments. This would be a useful test case to flesh out these
> APIs. Could you please open some JIRAs about this? There are already
> some Message-related JIRAs open so take a look at what is there
> already.

I found these JIRA issues related to this:

* https://issues.apache.org/jira/browse/ARROW-1860
* https://issues.apache.org/jira/browse/ARROW-2006
* https://issues.apache.org/jira/browse/ARROW-2027

For my case, would the following work:

0. Get the uint8_t* "data" and uint32_t "size" pair from my underlying
framework
1. Create a Buffer Reader using the "data" and "size" pair
2. Create a Message Reader using Buffer Reader
3. While there are messages in the Message Reader
  3.1. Read next Message
  3.2. Write Message to Output Stream

All of this will happen at the coordinator, so it will loop over the nodes
and process all the "data" received from each in no particular order.
Nevertheless, all the data of one node is processed at once entirely.

Cheers,
Rares

Re: C++ optimize stream output

Posted by Wes McKinney <we...@gmail.com>.
I opened https://issues.apache.org/jira/browse/ARROW-2319 per the
buffered output point

On Thu, Mar 15, 2018 at 8:38 PM, Wes McKinney <we...@gmail.com> wrote:
> hi Rares,
>
> sorry for the delay in writing back. To your questions
>
>    - Does this look good decent? Could the API be used in more efficient
>    ways in order to achieve the same goal?
>
> This seems perfectly reasonable with the API as it is now
>
>    - On each node side, do steps #3 and #7 copy data?
>
> Step 3 does not copy data, but step 7 does. You could do better by
> writing directly to the network protocol instead of using
> io::BufferOutputStream (which accumulates data in an in-memory
> buffer). It would be useful to have a buffered writer to limit the
> number of writes to the socket (or whatever protocol is being used).
> Feel free to open a JIRA about this
>
>    - On the coordinator side, do steps #3.2, and #3.3 copy data?
>
> Per ARROW-2189 -- do you get these problems if you build from source?
> I'm trying to understand if this is a packaging issue or a code issue
>
> Step 3.2 does not copy data Step 3.3 does write data to the FileOutputStream
>
>    - On the coordinator side, do I really need to read and write a record
>    batch? Could I copy the buffer directly somehow?
>
> No, you don't need to necessarily. The idea of the Message* classes in
> arrow::ipc is to facilitate transporting messages while being agnostic
> to their comments. This would be a useful test case to flesh out these
> APIs. Could you please open some JIRAs about this? There are already
> some Message-related JIRAs open so take a look at what is there
> already.
>
> Thanks
> Wes
>
>
> On Mon, Feb 26, 2018 at 10:22 AM, Rares Vernica <rv...@gmail.com> wrote:
>> Hello,
>>
>> I am using the C++ API to serialize and centralize data over the network. I
>> am wondering if I am using the API in an efficient way.
>>
>> I have multiple nodes and a coordinator communicating over the network. I
>> do not have fine control over the network communication. Individual nodes
>> write one chunk of data to the network. The coordinator will receive all
>> the chunks and can loop over them.
>>
>> On each node I do the following (the code is here
>> <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L512>
>> ):
>>
>>    1. Append data to Builders
>>    2. Finish Builders and get Arrays
>>    3. Create Record Batch from Arrays
>>    4. Create Pool Buffer
>>    5. Create Buffer Output Stream using Pool Buffer
>>    6. Open Record Batch Stream Writer using Buffer Output Stream
>>    7. Write Record Batch to writer
>>    8. Write Buffer data to network
>>
>> On the coordinator I do the following (the code is here
>> <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1000>
>> ):
>>
>>    1. Open File Output Stream
>>    2. Open Record Batch Stream Writer using File Output Stream
>>    3. For each chunk retrieved from the network
>>       1. Create Buffer Reader using data retrieved from the network (the
>>       code is here
>>       <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1054>
>>       )
>>       2. Create Record Batch Stream Reader using Buffer Reader and read
>>       Record Batch (I plan to use ReadRecordBatch, but I'm having
>> issues like in
>>       ARROW-2189)
>>       3. Write Record Batch
>>
>> A few questions:
>>
>>    - Does this look good decent? Could the API be used in more efficient
>>    ways in order to achieve the same goal?
>>    - On each node side, do steps #3 and #7 copy data?
>>    - On the coordinator side, do steps #3.2, and #3.3 copy data?
>>    - On the coordinator side, do I really need to read and write a record
>>    batch? Could I copy the buffer directly somehow?
>>
>> Thank you so much!
>> Rares
>>
>>
>>
>> (Could the same Pool Buffer be reused across calls?)

Re: C++ optimize stream output

Posted by Wes McKinney <we...@gmail.com>.
hi Rares,

sorry for the delay in writing back. To your questions

   - Does this look good decent? Could the API be used in more efficient
   ways in order to achieve the same goal?

This seems perfectly reasonable with the API as it is now

   - On each node side, do steps #3 and #7 copy data?

Step 3 does not copy data, but step 7 does. You could do better by
writing directly to the network protocol instead of using
io::BufferOutputStream (which accumulates data in an in-memory
buffer). It would be useful to have a buffered writer to limit the
number of writes to the socket (or whatever protocol is being used).
Feel free to open a JIRA about this

   - On the coordinator side, do steps #3.2, and #3.3 copy data?

Per ARROW-2189 -- do you get these problems if you build from source?
I'm trying to understand if this is a packaging issue or a code issue

Step 3.2 does not copy data Step 3.3 does write data to the FileOutputStream

   - On the coordinator side, do I really need to read and write a record
   batch? Could I copy the buffer directly somehow?

No, you don't need to necessarily. The idea of the Message* classes in
arrow::ipc is to facilitate transporting messages while being agnostic
to their comments. This would be a useful test case to flesh out these
APIs. Could you please open some JIRAs about this? There are already
some Message-related JIRAs open so take a look at what is there
already.

Thanks
Wes


On Mon, Feb 26, 2018 at 10:22 AM, Rares Vernica <rv...@gmail.com> wrote:
> Hello,
>
> I am using the C++ API to serialize and centralize data over the network. I
> am wondering if I am using the API in an efficient way.
>
> I have multiple nodes and a coordinator communicating over the network. I
> do not have fine control over the network communication. Individual nodes
> write one chunk of data to the network. The coordinator will receive all
> the chunks and can loop over them.
>
> On each node I do the following (the code is here
> <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L512>
> ):
>
>    1. Append data to Builders
>    2. Finish Builders and get Arrays
>    3. Create Record Batch from Arrays
>    4. Create Pool Buffer
>    5. Create Buffer Output Stream using Pool Buffer
>    6. Open Record Batch Stream Writer using Buffer Output Stream
>    7. Write Record Batch to writer
>    8. Write Buffer data to network
>
> On the coordinator I do the following (the code is here
> <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1000>
> ):
>
>    1. Open File Output Stream
>    2. Open Record Batch Stream Writer using File Output Stream
>    3. For each chunk retrieved from the network
>       1. Create Buffer Reader using data retrieved from the network (the
>       code is here
>       <https://github.com/Paradigm4/accelerated_io_tools/blob/e02aa37eb464d2eae501a36e4297adb28467f311/src/PhysicalAioSave.cpp#L1054>
>       )
>       2. Create Record Batch Stream Reader using Buffer Reader and read
>       Record Batch (I plan to use ReadRecordBatch, but I'm having
> issues like in
>       ARROW-2189)
>       3. Write Record Batch
>
> A few questions:
>
>    - Does this look good decent? Could the API be used in more efficient
>    ways in order to achieve the same goal?
>    - On each node side, do steps #3 and #7 copy data?
>    - On the coordinator side, do steps #3.2, and #3.3 copy data?
>    - On the coordinator side, do I really need to read and write a record
>    batch? Could I copy the buffer directly somehow?
>
> Thank you so much!
> Rares
>
>
>
> (Could the same Pool Buffer be reused across calls?)