You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Joel Ziegler <co...@freenet.de> on 2022/07/05 15:04:33 UTC

[C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Hi folks,

I read some data from a PostgreSQL database, convert it into 
RecordBatches and try to send the data to a client. But I fail to 
properly understand the usage of Apache Arrow C/GLib.

My information sources are the [C++ docs][1], [the Apache Arrow C/GLib 
reference manual][2] and [the C/GLib Github files][3].

By following the usage description of Apache Arrow C++ and experimenting 
with the wrapper classes in C, I build this minimal example of writing 
out a RecordBatch into a buffer and (after theoretically sending and 
receiving the buffer) trying to read that buffer back into a 
RecordBatch. But it fails and i would be glad, if you could point out my 
mistakes!

I omitted the error catching for readability. The code errors out at 
creation of the GArrowRecordBatchStreamReader. If i use the arrowbuffer 
or the buffer from the top in creating the InputStream, the error reads:

```[record-batch-stream-reader][open]: IOError: Expected IPC message of 
type schema but got record batch```.

If i use the testBuffer the error complains about an invalid IPC stream, 
so the data is just corrupt.


```
void testRecordbatchStream(GArrowRecordBatch *rb){
     GError *error = NULL;

     // Write Recordbatch
     GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, 
&error);
     GArrowBufferOutputStream *bufferStream = 
garrow_buffer_output_stream_new(buffer);
     long written = 
garrow_output_stream_write_record_batch(GARROW_OUTPUT_STREAM(bufferStream), 
rb, NULL, &error);

     // Use buffer as plain bytes
     void *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
     size_t length = garrow_buffer_get_size(GARROW_BUFFER(buffer));

     // Read plain bytes and test serialize function
     GArrowBuffer *testBuffer = garrow_buffer_new(data, length);
     GArrowBuffer *arrowbuffer = garrow_record_batch_serialize(rb, NULL, 
&error);

     // Read RecordBatch from buffer
     GArrowBufferInputStream *inputStream = 
garrow_buffer_input_stream_new(arrowbuffer);
     GArrowRecordBatchStreamReader *sr = 
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream), 
&error);
     GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, 
&error);


     printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, 
&error));
}
```


   [1]: https://arrow.apache.org/docs/cpp/index.html
   [2]: https://arrow.apache.org/docs/c_glib/arrow-glib/
   [3]: https://github.com/apache/arrow/tree/master/c_glib


Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

How do you send the written data over the network? Do you
use raw socket(2) and write(2)? If you use raw socket, can
we wrap the raw socket by GioUnixSocketStream[1]? We can
wrap the raw socket by g_unix_output_stream_new()[2] with
the file descriptor of the raw socket.

[1] https://docs.gtk.org/gio/class.UnixOutputStream.html
[2] https://docs.gtk.org/gio/ctor.UnixOutputStream.new.html

If we can wrap the raw socket by GioUnixSocketStream, we
don't need to create GArrowBuffer for serialized record
batches. We can write serialized record batches to the raw
socket directly.

I created examples to send/receive record batches via
network: https://github.com/apache/arrow/pull/13590

This may help you.


Thanks,
-- 
kou

In <2d...@freenet.de>
  "Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Mon, 11 Jul 2022 15:48:38 +0200,
  Joel Ziegler <co...@freenet.de> wrote:

> Man i should stop assuming return types with data in name are just
> bytes and instead read up on the datatype. Sorry for that, it's my
> first time with glib and Arrow.
> 
> Thanks a lot for the help, again! I am able to send RecordBatches over
> the network now.
> 
> A new problem arose and i could solve it, but i am not sure, whether
> my solution is appropriate. I would be glad if you can give me your
> opinion.
> 
> I started splitting bigger Tables in multiple RecordBatches, sending
> them over the network and reading them with
> garrow_record_batch_reader_read_next(). But i created a new
> GArrowRecordBatchStreamWriter for each RecordBatch and closed it with
> g_object_unref() before sending the data over, because i want to
> "close" the writer before reading from the output buffer. This lead to
> the StreamReader only assuming the first RecordBatch in the stream,
> probably because the writer writes an EOS. So i started not using
> g_object_unref() on the StreamWriter and just reading from the buffer,
> which seems to work fine. Am i just lucky? Or is there another way of
> securely reading parts of the buffer, even though more RecordBatches
> will be written in the future?
> 
> I also wanted to ask, where can i find the usage of these Arrow Writer
> Classes? The usage of the GLib classes are well documented and i was
> just blind in not finding the information, you provided, because of
> false assumptions. But i can't find the Arrow documentation, which is
> explaining the usage of the Writer classes, as you did to me.
> 
> Sorry, if i am asking too much. I am also fine, if you just send some
> direction or links, with which i can find the solution by myself. You
> don't have to build my code :)
> 
> 
> Sincerely, Joel Ziegler
> 
> 
> On 09.07.22 04:48, Sutou Kouhei wrote:
>> Hi,
>>
>>>      GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>>      gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>>
>>>      GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
>> The data is GBytes * not const char *. You need to get raw
>> data from GBytes *:
>>
>>    GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>
>>    gsize data_size;
>>    gconstpointer data_raw = g_bytes_get_data(data, &data_size);
>>    GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw,
>>    data_size);
>>
>> And you need to call g_bytes_unref() against the data when
>> no longer needed:
>>
>>    g_bytes_unref(data);
>>
>>
>> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Joel Ziegler <co...@freenet.de>.
Man i should stop assuming return types with data in name are just bytes 
and instead read up on the datatype. Sorry for that, it's my first time 
with glib and Arrow.

Thanks a lot for the help, again! I am able to send RecordBatches over 
the network now.

A new problem arose and i could solve it, but i am not sure, whether my 
solution is appropriate. I would be glad if you can give me your opinion.

I started splitting bigger Tables in multiple RecordBatches, sending 
them over the network and reading them with 
garrow_record_batch_reader_read_next(). But i created a new 
GArrowRecordBatchStreamWriter for each RecordBatch and closed it with 
g_object_unref() before sending the data over, because i want to "close" 
the writer before reading from the output buffer. This lead to the 
StreamReader only assuming the first RecordBatch in the stream, probably 
because the writer writes an EOS. So i started not using 
g_object_unref() on the StreamWriter and just reading from the buffer, 
which seems to work fine. Am i just lucky? Or is there another way of 
securely reading parts of the buffer, even though more RecordBatches 
will be written in the future?

I also wanted to ask, where can i find the usage of these Arrow Writer 
Classes? The usage of the GLib classes are well documented and i was 
just blind in not finding the information, you provided, because of 
false assumptions. But i can't find the Arrow documentation, which is 
explaining the usage of the Writer classes, as you did to me.

Sorry, if i am asking too much. I am also fine, if you just send some 
direction or links, with which i can find the solution by myself. You 
don't have to build my code :)


Sincerely, Joel Ziegler


On 09.07.22 04:48, Sutou Kouhei wrote:
> Hi,
>
>>      GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>      gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>>      GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
> The data is GBytes * not const char *. You need to get raw
> data from GBytes *:
>
>    GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>
>    gsize data_size;
>    gconstpointer data_raw = g_bytes_get_data(data, &data_size);
>    GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw, data_size);
>
> And you need to call g_bytes_unref() against the data when
> no longer needed:
>
>    g_bytes_unref(data);
>
>
> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

>     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);

The data is GBytes * not const char *. You need to get raw
data from GBytes *:

  GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));

  gsize data_size;
  gconstpointer data_raw = g_bytes_get_data(data, &data_size);
  GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw, data_size);

And you need to call g_bytes_unref() against the data when
no longer needed:

  g_bytes_unref(data);


Thanks,
-- 
kou

In <4f...@freenet.de>
  "Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Fri, 8 Jul 2022 15:05:52 +0200,
  Joel Ziegler <co...@freenet.de> wrote:

> Hi Sutou Kouhei,
> 
> closing the writer before requesting the data does not solve the
> problem at my side. Any other error i made? The error happens at the
> creation of the RecordBatchStreamReader.
> 
> 
> void testRecordbatchStream(GArrowRecordBatch *rb){
>     GError *error = NULL;
>     GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
> &error);
>     if(buffer == NULL){
>         fprintf(stderr, "Failed to initialize resizable buffer! Error
> message: %s\n", error->message);
>         g_error_free(error);
>     }
> 
>     GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
>     GArrowSchema *schema = garrow_record_batch_get_schema(rb);
>     GArrowRecordBatchStreamWriter *sw =
> garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(bufferStream),
> schema, &error);
>     if(sw == NULL){
>         fprintf(stderr, "Failed to create Record batch writer! Error
> message: %s\n", error->message);
>         g_error_free(error);
>     }
> 
>     g_object_unref(bufferStream);
>     g_object_unref(schema);
> 
>     gboolean test =
> garrow_record_batch_writer_write_record_batch(GARROW_RECORD_BATCH_WRITER(sw),
> rb, &error);
>     if(!test){
>         fprintf(stderr, "Failed to write Record batch! Error message:
> %s\n", error->message);
>         g_error_free(error);
>     }
>     g_object_unref(sw);
> 
>     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
>     GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer)); //
> using inital buffer here, without the intermediate data pointer,
> works!
>     GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
> &error);
>     if(sr == NULL){
>         fprintf(stderr, "Failed to create stream reader! Error
> message: %s\n", error->message);
>         g_error_free(error);
>     }
> 
>     GArrowRecordBatch *rb2 =
> garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(sr),
> &error);
>     if(rb == NULL){
>             printf("Failed to create record batch from stream... Error
> message: %s\n", error->message);
>             g_error_free(error);
>     }else {
>             printf("Recordbatch:\n%s\n",
> garrow_record_batch_to_string(rb2, &error));
>     }
> 
>     g_object_unref(inputStream);
>     g_object_unref(rb2);
>     g_object_unref(sr);
>     g_object_unref(buffer);
> }
> 
> 
> On 07.07.22 22:30, Sutou Kouhei wrote:
>> Hi,
>>
>>>      gboolean test = garrow_record_batch_writer_write_record_batch(
>>>              GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>>>
>>>      GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>>      gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>>
>>>      g_object_unref(sw);
>> You need to "close" the writer before you get data from
>> buffer. g_object_unref(sw) closes the writer implicitly:
>>
>>    gboolean test = garrow_record_batch_writer_write_record_batch(
>>            GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>>    g_object_unref(sw);
>>
>>    GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>    gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>>
>> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Joel Ziegler <co...@freenet.de>.
Hi Sutou Kouhei,

closing the writer before requesting the data does not solve the problem 
at my side. Any other error i made? The error happens at the creation of 
the RecordBatchStreamReader.


void testRecordbatchStream(GArrowRecordBatch *rb){
     GError *error = NULL;
     GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, 
&error);
     if(buffer == NULL){
         fprintf(stderr, "Failed to initialize resizable buffer! Error 
message: %s\n", error->message);
         g_error_free(error);
     }

     GArrowBufferOutputStream *bufferStream = 
garrow_buffer_output_stream_new(buffer);
     GArrowSchema *schema = garrow_record_batch_get_schema(rb);
     GArrowRecordBatchStreamWriter *sw = 
garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(bufferStream), 
schema, &error);
     if(sw == NULL){
         fprintf(stderr, "Failed to create Record batch writer! Error 
message: %s\n", error->message);
         g_error_free(error);
     }

     g_object_unref(bufferStream);
     g_object_unref(schema);

     gboolean test = 
garrow_record_batch_writer_write_record_batch(GARROW_RECORD_BATCH_WRITER(sw), 
rb, &error);
     if(!test){
         fprintf(stderr, "Failed to write Record batch! Error message: 
%s\n", error->message);
         g_error_free(error);
     }
     g_object_unref(sw);

     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));

     GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
     GArrowBufferInputStream *inputStream = 
garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer)); // using 
inital buffer here, without the intermediate data pointer, works!
     GArrowRecordBatchStreamReader *sr = 
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream), 
&error);
     if(sr == NULL){
         fprintf(stderr, "Failed to create stream reader! Error message: 
%s\n", error->message);
         g_error_free(error);
     }

     GArrowRecordBatch *rb2 = 
garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(sr), 
&error);
     if(rb == NULL){
             printf("Failed to create record batch from stream... Error 
message: %s\n", error->message);
             g_error_free(error);
     }else {
             printf("Recordbatch:\n%s\n", 
garrow_record_batch_to_string(rb2, &error));
     }

     g_object_unref(inputStream);
     g_object_unref(rb2);
     g_object_unref(sr);
     g_object_unref(buffer);
}


On 07.07.22 22:30, Sutou Kouhei wrote:
> Hi,
>
>>      gboolean test = garrow_record_batch_writer_write_record_batch(
>>              GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>>
>>      GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>      gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>>      g_object_unref(sw);
> You need to "close" the writer before you get data from
> buffer. g_object_unref(sw) closes the writer implicitly:
>
>    gboolean test = garrow_record_batch_writer_write_record_batch(
>            GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>    g_object_unref(sw);
>
>    GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>    gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
>
> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

>     gboolean test = garrow_record_batch_writer_write_record_batch(
>             GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
> 
>     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     g_object_unref(sw);

You need to "close" the writer before you get data from
buffer. g_object_unref(sw) closes the writer implicitly:

  gboolean test = garrow_record_batch_writer_write_record_batch(
          GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
  g_object_unref(sw);

  GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
  gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));


Thanks,
-- 
kou

In <04...@freenet.de>
  "Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Thu, 7 Jul 2022 14:17:39 +0200,
  Joel Ziegler <co...@freenet.de> wrote:

> Thanks a lot for the reply! The conversion works as you wrote it. I am
> still unsure, how to send the IPC format written buffer. I tried
> getting a pointer to the data and length, so that i can simply send
> the data over the network, but the buffer created from the data
> pointer and length is not the same.
> 
> 
>     gboolean test = garrow_record_batch_writer_write_record_batch(
>             GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
> 
>     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     g_object_unref(sw);
> 
>     // Receiving side
>     GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
>     GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer));
> 
> 
> On 06.07.22 09:35, Sutou Kouhei wrote:
>> Hi,
>>
>> You need to use GArrowRecordBatchStreamWriter instead of
>> garrow_output_stream_write_record_batch() to read by
>> GArrowRecordBatchStreamReader.
>>
>> GArrowRecordBatchStreamWriter and
>> GArrowRecordBatchStreamReader assume
>> IPC Streaming Format
>> https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
>> but garrow_output_stream_write_record_batch() just writes a
>> RecordBatch message
>> https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
>> .
>>
>> void testRecordbatchStream(GArrowRecordBatch *rb){
>>    GError *error = NULL;
>>
>>    // Write Recordbatch
>>    GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
>>    &error);
>>    GArrowBufferOutputStream *bufferStream =
>>      garrow_buffer_output_stream_new(buffer);
>>    GArrowSchema *schema = garrow_record_batch_get_schema(rb);
>>    GArrowRecordBatchStreamWriter *writer =
>>      garrow_record_batch_stream_writer_new(bufferStream, schema, error);
>>    g_object_unref(schema);
>>    g_object_unref(bufferStream);
>>    garrow_record_batch_writer_write_record_batch(
>>      GARROW_RECORD_BATCH_WRITER(writer), rb, error);
>>    g_object_unref(writer);
>>
>>    // Read RecordBatch from buffer
>>    GArrowBufferInputStream *inputStream =
>>      garrow_buffer_input_stream_new(buffer);
>>    GArrowRecordBatchStreamReader *sr =
>>      garrow_record_batch_stream_reader_new(
>>        GARROW_INPUT_STREAM(inputStream), &error);
>>    g_object_unref(inputStream);
>>    GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
>>    &error);
>>    printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
>>    &error));
>>    g_object_urnef(rb2);
>>    g_object_unref(sr);
>>
>>    g_object_unref(buffer);
>> }
>>
>> Your code misses g_object_unref()s. You need to call
>> g_object_unref() when an object is no longer needed. If you
>> forget to call g_object_unref(), your program causes a
>> memory leak.
>>
>>
>> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Joel Ziegler <co...@freenet.de>.
Thanks a lot for the reply! The conversion works as you wrote it. I am 
still unsure, how to send the IPC format written buffer. I tried getting 
a pointer to the data and length, so that i can simply send the data 
over the network, but the buffer created from the data pointer and 
length is not the same.


     gboolean test = garrow_record_batch_writer_write_record_batch(
             GARROW_RECORD_BATCH_WRITER(sw), rb, &error);

     GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
     gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));

     g_object_unref(sw);

     // Receiving side
     GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
     GArrowBufferInputStream *inputStream = 
garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer));


On 06.07.22 09:35, Sutou Kouhei wrote:
> Hi,
>
> You need to use GArrowRecordBatchStreamWriter instead of
> garrow_output_stream_write_record_batch() to read by
> GArrowRecordBatchStreamReader.
>
> GArrowRecordBatchStreamWriter and
> GArrowRecordBatchStreamReader assume
> IPC Streaming Format
> https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
> but garrow_output_stream_write_record_batch() just writes a
> RecordBatch message
> https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
> .
>
> void testRecordbatchStream(GArrowRecordBatch *rb){
>    GError *error = NULL;
>
>    // Write Recordbatch
>    GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
>    GArrowBufferOutputStream *bufferStream =
>      garrow_buffer_output_stream_new(buffer);
>    GArrowSchema *schema = garrow_record_batch_get_schema(rb);
>    GArrowRecordBatchStreamWriter *writer =
>      garrow_record_batch_stream_writer_new(bufferStream, schema, error);
>    g_object_unref(schema);
>    g_object_unref(bufferStream);
>    garrow_record_batch_writer_write_record_batch(
>      GARROW_RECORD_BATCH_WRITER(writer), rb, error);
>    g_object_unref(writer);
>
>    // Read RecordBatch from buffer
>    GArrowBufferInputStream *inputStream =
>      garrow_buffer_input_stream_new(buffer);
>    GArrowRecordBatchStreamReader *sr =
>      garrow_record_batch_stream_reader_new(
>        GARROW_INPUT_STREAM(inputStream), &error);
>    g_object_unref(inputStream);
>    GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
>    printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
>    g_object_urnef(rb2);
>    g_object_unref(sr);
>
>    g_object_unref(buffer);
> }
>
> Your code misses g_object_unref()s. You need to call
> g_object_unref() when an object is no longer needed. If you
> forget to call g_object_unref(), your program causes a
> memory leak.
>
>
> Thanks,

Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C

Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,

You need to use GArrowRecordBatchStreamWriter instead of
garrow_output_stream_write_record_batch() to read by
GArrowRecordBatchStreamReader.

GArrowRecordBatchStreamWriter and
GArrowRecordBatchStreamReader assume
IPC Streaming Format
https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
but garrow_output_stream_write_record_batch() just writes a
RecordBatch message
https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
.

void testRecordbatchStream(GArrowRecordBatch *rb){
  GError *error = NULL;

  // Write Recordbatch
  GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
  GArrowBufferOutputStream *bufferStream =
    garrow_buffer_output_stream_new(buffer);
  GArrowSchema *schema = garrow_record_batch_get_schema(rb);
  GArrowRecordBatchStreamWriter *writer =
    garrow_record_batch_stream_writer_new(bufferStream, schema, error);
  g_object_unref(schema);
  g_object_unref(bufferStream);
  garrow_record_batch_writer_write_record_batch(
    GARROW_RECORD_BATCH_WRITER(writer), rb, error);
  g_object_unref(writer);

  // Read RecordBatch from buffer
  GArrowBufferInputStream *inputStream =
    garrow_buffer_input_stream_new(buffer);
  GArrowRecordBatchStreamReader *sr =
    garrow_record_batch_stream_reader_new(
      GARROW_INPUT_STREAM(inputStream), &error);
  g_object_unref(inputStream);
  GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
  printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
  g_object_urnef(rb2);
  g_object_unref(sr);

  g_object_unref(buffer);
}

Your code misses g_object_unref()s. You need to call
g_object_unref() when an object is no longer needed. If you
forget to call g_object_unref(), your program causes a
memory leak.


Thanks,
-- 
kou

In <de...@freenet.de>
  "[C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Tue, 5 Jul 2022 17:04:33 +0200,
  Joel Ziegler <co...@freenet.de> wrote:

> Hi folks,
> 
> I read some data from a PostgreSQL database, convert it into
> RecordBatches and try to send the data to a client. But I fail to
> properly understand the usage of Apache Arrow C/GLib.
> 
> My information sources are the [C++ docs][1], [the Apache Arrow C/GLib
> reference manual][2] and [the C/GLib Github files][3].
> 
> By following the usage description of Apache Arrow C++ and
> experimenting with the wrapper classes in C, I build this minimal
> example of writing out a RecordBatch into a buffer and (after
> theoretically sending and receiving the buffer) trying to read that
> buffer back into a RecordBatch. But it fails and i would be glad, if
> you could point out my mistakes!
> 
> I omitted the error catching for readability. The code errors out at
> creation of the GArrowRecordBatchStreamReader. If i use the
> arrowbuffer or the buffer from the top in creating the InputStream,
> the error reads:
> 
> ```[record-batch-stream-reader][open]: IOError: Expected IPC message
> of type schema but got record batch```.
> 
> If i use the testBuffer the error complains about an invalid IPC
> stream, so the data is just corrupt.
> 
> 
> ```
> void testRecordbatchStream(GArrowRecordBatch *rb){
>     GError *error = NULL;
> 
>     // Write Recordbatch
>     GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
> &error);
>     GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
>     long written =
> garrow_output_stream_write_record_batch(GARROW_OUTPUT_STREAM(bufferStream),
> rb, NULL, &error);
> 
>     // Use buffer as plain bytes
>     void *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     size_t length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     // Read plain bytes and test serialize function
>     GArrowBuffer *testBuffer = garrow_buffer_new(data, length);
>     GArrowBuffer *arrowbuffer = garrow_record_batch_serialize(rb,
> NULL, &error);
> 
>     // Read RecordBatch from buffer
>     GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(arrowbuffer);
>     GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
> &error);
>     GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
> &error);
> 
> 
>     printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
> &error));
> }
> ```
> 
> 
>   [1]: https://arrow.apache.org/docs/cpp/index.html
>   [2]: https://arrow.apache.org/docs/c_glib/arrow-glib/
>   [3]: https://github.com/apache/arrow/tree/master/c_glib
>