You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Joel Ziegler <co...@freenet.de> on 2022/07/05 15:04:33 UTC
[C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Hi folks,
I read some data from a PostgreSQL database, convert it into
RecordBatches and try to send the data to a client. But I fail to
properly understand the usage of Apache Arrow C/GLib.
My information sources are the [C++ docs][1], [the Apache Arrow C/GLib
reference manual][2] and [the C/GLib Github files][3].
By following the usage description of Apache Arrow C++ and experimenting
with the wrapper classes in C, I build this minimal example of writing
out a RecordBatch into a buffer and (after theoretically sending and
receiving the buffer) trying to read that buffer back into a
RecordBatch. But it fails and i would be glad, if you could point out my
mistakes!
I omitted the error catching for readability. The code errors out at
creation of the GArrowRecordBatchStreamReader. If i use the arrowbuffer
or the buffer from the top in creating the InputStream, the error reads:
```[record-batch-stream-reader][open]: IOError: Expected IPC message of
type schema but got record batch```.
If i use the testBuffer the error complains about an invalid IPC stream,
so the data is just corrupt.
```
void testRecordbatchStream(GArrowRecordBatch *rb){
GError *error = NULL;
// Write Recordbatch
GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
&error);
GArrowBufferOutputStream *bufferStream =
garrow_buffer_output_stream_new(buffer);
long written =
garrow_output_stream_write_record_batch(GARROW_OUTPUT_STREAM(bufferStream),
rb, NULL, &error);
// Use buffer as plain bytes
void *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
size_t length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
// Read plain bytes and test serialize function
GArrowBuffer *testBuffer = garrow_buffer_new(data, length);
GArrowBuffer *arrowbuffer = garrow_record_batch_serialize(rb, NULL,
&error);
// Read RecordBatch from buffer
GArrowBufferInputStream *inputStream =
garrow_buffer_input_stream_new(arrowbuffer);
GArrowRecordBatchStreamReader *sr =
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
&error);
GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
&error);
printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
&error));
}
```
[1]: https://arrow.apache.org/docs/cpp/index.html
[2]: https://arrow.apache.org/docs/c_glib/arrow-glib/
[3]: https://github.com/apache/arrow/tree/master/c_glib
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,
How do you send the written data over the network? Do you
use raw socket(2) and write(2)? If you use raw socket, can
we wrap the raw socket by GioUnixSocketStream[1]? We can
wrap the raw socket by g_unix_output_stream_new()[2] with
the file descriptor of the raw socket.
[1] https://docs.gtk.org/gio/class.UnixOutputStream.html
[2] https://docs.gtk.org/gio/ctor.UnixOutputStream.new.html
If we can wrap the raw socket by GioUnixSocketStream, we
don't need to create GArrowBuffer for serialized record
batches. We can write serialized record batches to the raw
socket directly.
I created examples to send/receive record batches via
network: https://github.com/apache/arrow/pull/13590
This may help you.
Thanks,
--
kou
In <2d...@freenet.de>
"Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Mon, 11 Jul 2022 15:48:38 +0200,
Joel Ziegler <co...@freenet.de> wrote:
> Man i should stop assuming return types with data in name are just
> bytes and instead read up on the datatype. Sorry for that, it's my
> first time with glib and Arrow.
>
> Thanks a lot for the help, again! I am able to send RecordBatches over
> the network now.
>
> A new problem arose and i could solve it, but i am not sure, whether
> my solution is appropriate. I would be glad if you can give me your
> opinion.
>
> I started splitting bigger Tables in multiple RecordBatches, sending
> them over the network and reading them with
> garrow_record_batch_reader_read_next(). But i created a new
> GArrowRecordBatchStreamWriter for each RecordBatch and closed it with
> g_object_unref() before sending the data over, because i want to
> "close" the writer before reading from the output buffer. This lead to
> the StreamReader only assuming the first RecordBatch in the stream,
> probably because the writer writes an EOS. So i started not using
> g_object_unref() on the StreamWriter and just reading from the buffer,
> which seems to work fine. Am i just lucky? Or is there another way of
> securely reading parts of the buffer, even though more RecordBatches
> will be written in the future?
>
> I also wanted to ask, where can i find the usage of these Arrow Writer
> Classes? The usage of the GLib classes are well documented and i was
> just blind in not finding the information, you provided, because of
> false assumptions. But i can't find the Arrow documentation, which is
> explaining the usage of the Writer classes, as you did to me.
>
> Sorry, if i am asking too much. I am also fine, if you just send some
> direction or links, with which i can find the solution by myself. You
> don't have to build my code :)
>
>
> Sincerely, Joel Ziegler
>
>
> On 09.07.22 04:48, Sutou Kouhei wrote:
>> Hi,
>>
>>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>>
>>> GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
>> The data is GBytes * not const char *. You need to get raw
>> data from GBytes *:
>>
>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>
>> gsize data_size;
>> gconstpointer data_raw = g_bytes_get_data(data, &data_size);
>> GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw,
>> data_size);
>>
>> And you need to call g_bytes_unref() against the data when
>> no longer needed:
>>
>> g_bytes_unref(data);
>>
>>
>> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Joel Ziegler <co...@freenet.de>.
Man i should stop assuming return types with data in name are just bytes
and instead read up on the datatype. Sorry for that, it's my first time
with glib and Arrow.
Thanks a lot for the help, again! I am able to send RecordBatches over
the network now.
A new problem arose and i could solve it, but i am not sure, whether my
solution is appropriate. I would be glad if you can give me your opinion.
I started splitting bigger Tables in multiple RecordBatches, sending
them over the network and reading them with
garrow_record_batch_reader_read_next(). But i created a new
GArrowRecordBatchStreamWriter for each RecordBatch and closed it with
g_object_unref() before sending the data over, because i want to "close"
the writer before reading from the output buffer. This lead to the
StreamReader only assuming the first RecordBatch in the stream, probably
because the writer writes an EOS. So i started not using
g_object_unref() on the StreamWriter and just reading from the buffer,
which seems to work fine. Am i just lucky? Or is there another way of
securely reading parts of the buffer, even though more RecordBatches
will be written in the future?
I also wanted to ask, where can i find the usage of these Arrow Writer
Classes? The usage of the GLib classes are well documented and i was
just blind in not finding the information, you provided, because of
false assumptions. But i can't find the Arrow documentation, which is
explaining the usage of the Writer classes, as you did to me.
Sorry, if i am asking too much. I am also fine, if you just send some
direction or links, with which i can find the solution by myself. You
don't have to build my code :)
Sincerely, Joel Ziegler
On 09.07.22 04:48, Sutou Kouhei wrote:
> Hi,
>
>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>> GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
> The data is GBytes * not const char *. You need to get raw
> data from GBytes *:
>
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>
> gsize data_size;
> gconstpointer data_raw = g_bytes_get_data(data, &data_size);
> GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw, data_size);
>
> And you need to call g_bytes_unref() against the data when
> no longer needed:
>
> g_bytes_unref(data);
>
>
> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
> GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
The data is GBytes * not const char *. You need to get raw
data from GBytes *:
GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
gsize data_size;
gconstpointer data_raw = g_bytes_get_data(data, &data_size);
GArrowBuffer *receivingBuffer = garrow_buffer_new(data_raw, data_size);
And you need to call g_bytes_unref() against the data when
no longer needed:
g_bytes_unref(data);
Thanks,
--
kou
In <4f...@freenet.de>
"Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Fri, 8 Jul 2022 15:05:52 +0200,
Joel Ziegler <co...@freenet.de> wrote:
> Hi Sutou Kouhei,
>
> closing the writer before requesting the data does not solve the
> problem at my side. Any other error i made? The error happens at the
> creation of the RecordBatchStreamReader.
>
>
> void testRecordbatchStream(GArrowRecordBatch *rb){
> GError *error = NULL;
> GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
> &error);
> if(buffer == NULL){
> fprintf(stderr, "Failed to initialize resizable buffer! Error
> message: %s\n", error->message);
> g_error_free(error);
> }
>
> GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
> GArrowSchema *schema = garrow_record_batch_get_schema(rb);
> GArrowRecordBatchStreamWriter *sw =
> garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(bufferStream),
> schema, &error);
> if(sw == NULL){
> fprintf(stderr, "Failed to create Record batch writer! Error
> message: %s\n", error->message);
> g_error_free(error);
> }
>
> g_object_unref(bufferStream);
> g_object_unref(schema);
>
> gboolean test =
> garrow_record_batch_writer_write_record_batch(GARROW_RECORD_BATCH_WRITER(sw),
> rb, &error);
> if(!test){
> fprintf(stderr, "Failed to write Record batch! Error message:
> %s\n", error->message);
> g_error_free(error);
> }
> g_object_unref(sw);
>
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
> GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
> GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer)); //
> using inital buffer here, without the intermediate data pointer,
> works!
> GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
> &error);
> if(sr == NULL){
> fprintf(stderr, "Failed to create stream reader! Error
> message: %s\n", error->message);
> g_error_free(error);
> }
>
> GArrowRecordBatch *rb2 =
> garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(sr),
> &error);
> if(rb == NULL){
> printf("Failed to create record batch from stream... Error
> message: %s\n", error->message);
> g_error_free(error);
> }else {
> printf("Recordbatch:\n%s\n",
> garrow_record_batch_to_string(rb2, &error));
> }
>
> g_object_unref(inputStream);
> g_object_unref(rb2);
> g_object_unref(sr);
> g_object_unref(buffer);
> }
>
>
> On 07.07.22 22:30, Sutou Kouhei wrote:
>> Hi,
>>
>>> gboolean test = garrow_record_batch_writer_write_record_batch(
>>> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>>>
>>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>>> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>>
>>> g_object_unref(sw);
>> You need to "close" the writer before you get data from
>> buffer. g_object_unref(sw) closes the writer implicitly:
>>
>> gboolean test = garrow_record_batch_writer_write_record_batch(
>> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>> g_object_unref(sw);
>>
>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>>
>> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Joel Ziegler <co...@freenet.de>.
Hi Sutou Kouhei,
closing the writer before requesting the data does not solve the problem
at my side. Any other error i made? The error happens at the creation of
the RecordBatchStreamReader.
void testRecordbatchStream(GArrowRecordBatch *rb){
GError *error = NULL;
GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
&error);
if(buffer == NULL){
fprintf(stderr, "Failed to initialize resizable buffer! Error
message: %s\n", error->message);
g_error_free(error);
}
GArrowBufferOutputStream *bufferStream =
garrow_buffer_output_stream_new(buffer);
GArrowSchema *schema = garrow_record_batch_get_schema(rb);
GArrowRecordBatchStreamWriter *sw =
garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(bufferStream),
schema, &error);
if(sw == NULL){
fprintf(stderr, "Failed to create Record batch writer! Error
message: %s\n", error->message);
g_error_free(error);
}
g_object_unref(bufferStream);
g_object_unref(schema);
gboolean test =
garrow_record_batch_writer_write_record_batch(GARROW_RECORD_BATCH_WRITER(sw),
rb, &error);
if(!test){
fprintf(stderr, "Failed to write Record batch! Error message:
%s\n", error->message);
g_error_free(error);
}
g_object_unref(sw);
GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
GArrowBufferInputStream *inputStream =
garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer)); // using
inital buffer here, without the intermediate data pointer, works!
GArrowRecordBatchStreamReader *sr =
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
&error);
if(sr == NULL){
fprintf(stderr, "Failed to create stream reader! Error message:
%s\n", error->message);
g_error_free(error);
}
GArrowRecordBatch *rb2 =
garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(sr),
&error);
if(rb == NULL){
printf("Failed to create record batch from stream... Error
message: %s\n", error->message);
g_error_free(error);
}else {
printf("Recordbatch:\n%s\n",
garrow_record_batch_to_string(rb2, &error));
}
g_object_unref(inputStream);
g_object_unref(rb2);
g_object_unref(sr);
g_object_unref(buffer);
}
On 07.07.22 22:30, Sutou Kouhei wrote:
> Hi,
>
>> gboolean test = garrow_record_batch_writer_write_record_batch(
>> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>>
>> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>>
>> g_object_unref(sw);
> You need to "close" the writer before you get data from
> buffer. g_object_unref(sw) closes the writer implicitly:
>
> gboolean test = garrow_record_batch_writer_write_record_batch(
> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
> g_object_unref(sw);
>
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
>
> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,
> gboolean test = garrow_record_batch_writer_write_record_batch(
> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
> g_object_unref(sw);
You need to "close" the writer before you get data from
buffer. g_object_unref(sw) closes the writer implicitly:
gboolean test = garrow_record_batch_writer_write_record_batch(
GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
g_object_unref(sw);
GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
Thanks,
--
kou
In <04...@freenet.de>
"Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Thu, 7 Jul 2022 14:17:39 +0200,
Joel Ziegler <co...@freenet.de> wrote:
> Thanks a lot for the reply! The conversion works as you wrote it. I am
> still unsure, how to send the IPC format written buffer. I tried
> getting a pointer to the data and length, so that i can simply send
> the data over the network, but the buffer created from the data
> pointer and length is not the same.
>
>
> gboolean test = garrow_record_batch_writer_write_record_batch(
> GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
>
> GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
> g_object_unref(sw);
>
> // Receiving side
> GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
> GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer));
>
>
> On 06.07.22 09:35, Sutou Kouhei wrote:
>> Hi,
>>
>> You need to use GArrowRecordBatchStreamWriter instead of
>> garrow_output_stream_write_record_batch() to read by
>> GArrowRecordBatchStreamReader.
>>
>> GArrowRecordBatchStreamWriter and
>> GArrowRecordBatchStreamReader assume
>> IPC Streaming Format
>> https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
>> but garrow_output_stream_write_record_batch() just writes a
>> RecordBatch message
>> https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
>> .
>>
>> void testRecordbatchStream(GArrowRecordBatch *rb){
>> GError *error = NULL;
>>
>> // Write Recordbatch
>> GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
>> &error);
>> GArrowBufferOutputStream *bufferStream =
>> garrow_buffer_output_stream_new(buffer);
>> GArrowSchema *schema = garrow_record_batch_get_schema(rb);
>> GArrowRecordBatchStreamWriter *writer =
>> garrow_record_batch_stream_writer_new(bufferStream, schema, error);
>> g_object_unref(schema);
>> g_object_unref(bufferStream);
>> garrow_record_batch_writer_write_record_batch(
>> GARROW_RECORD_BATCH_WRITER(writer), rb, error);
>> g_object_unref(writer);
>>
>> // Read RecordBatch from buffer
>> GArrowBufferInputStream *inputStream =
>> garrow_buffer_input_stream_new(buffer);
>> GArrowRecordBatchStreamReader *sr =
>> garrow_record_batch_stream_reader_new(
>> GARROW_INPUT_STREAM(inputStream), &error);
>> g_object_unref(inputStream);
>> GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
>> &error);
>> printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
>> &error));
>> g_object_urnef(rb2);
>> g_object_unref(sr);
>>
>> g_object_unref(buffer);
>> }
>>
>> Your code misses g_object_unref()s. You need to call
>> g_object_unref() when an object is no longer needed. If you
>> forget to call g_object_unref(), your program causes a
>> memory leak.
>>
>>
>> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Joel Ziegler <co...@freenet.de>.
Thanks a lot for the reply! The conversion works as you wrote it. I am
still unsure, how to send the IPC format written buffer. I tried getting
a pointer to the data and length, so that i can simply send the data
over the network, but the buffer created from the data pointer and
length is not the same.
gboolean test = garrow_record_batch_writer_write_record_batch(
GARROW_RECORD_BATCH_WRITER(sw), rb, &error);
GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
g_object_unref(sw);
// Receiving side
GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
GArrowBufferInputStream *inputStream =
garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer));
On 06.07.22 09:35, Sutou Kouhei wrote:
> Hi,
>
> You need to use GArrowRecordBatchStreamWriter instead of
> garrow_output_stream_write_record_batch() to read by
> GArrowRecordBatchStreamReader.
>
> GArrowRecordBatchStreamWriter and
> GArrowRecordBatchStreamReader assume
> IPC Streaming Format
> https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
> but garrow_output_stream_write_record_batch() just writes a
> RecordBatch message
> https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
> .
>
> void testRecordbatchStream(GArrowRecordBatch *rb){
> GError *error = NULL;
>
> // Write Recordbatch
> GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
> GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
> GArrowSchema *schema = garrow_record_batch_get_schema(rb);
> GArrowRecordBatchStreamWriter *writer =
> garrow_record_batch_stream_writer_new(bufferStream, schema, error);
> g_object_unref(schema);
> g_object_unref(bufferStream);
> garrow_record_batch_writer_write_record_batch(
> GARROW_RECORD_BATCH_WRITER(writer), rb, error);
> g_object_unref(writer);
>
> // Read RecordBatch from buffer
> GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(buffer);
> GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(
> GARROW_INPUT_STREAM(inputStream), &error);
> g_object_unref(inputStream);
> GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
> printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
> g_object_urnef(rb2);
> g_object_unref(sr);
>
> g_object_unref(buffer);
> }
>
> Your code misses g_object_unref()s. You need to call
> g_object_unref() when an object is no longer needed. If you
> forget to call g_object_unref(), your program causes a
> memory leak.
>
>
> Thanks,
Re: [C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C
Posted by Sutou Kouhei <ko...@clear-code.com>.
Hi,
You need to use GArrowRecordBatchStreamWriter instead of
garrow_output_stream_write_record_batch() to read by
GArrowRecordBatchStreamReader.
GArrowRecordBatchStreamWriter and
GArrowRecordBatchStreamReader assume
IPC Streaming Format
https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
but garrow_output_stream_write_record_batch() just writes a
RecordBatch message
https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
.
void testRecordbatchStream(GArrowRecordBatch *rb){
GError *error = NULL;
// Write Recordbatch
GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
GArrowBufferOutputStream *bufferStream =
garrow_buffer_output_stream_new(buffer);
GArrowSchema *schema = garrow_record_batch_get_schema(rb);
GArrowRecordBatchStreamWriter *writer =
garrow_record_batch_stream_writer_new(bufferStream, schema, error);
g_object_unref(schema);
g_object_unref(bufferStream);
garrow_record_batch_writer_write_record_batch(
GARROW_RECORD_BATCH_WRITER(writer), rb, error);
g_object_unref(writer);
// Read RecordBatch from buffer
GArrowBufferInputStream *inputStream =
garrow_buffer_input_stream_new(buffer);
GArrowRecordBatchStreamReader *sr =
garrow_record_batch_stream_reader_new(
GARROW_INPUT_STREAM(inputStream), &error);
g_object_unref(inputStream);
GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
g_object_urnef(rb2);
g_object_unref(sr);
g_object_unref(buffer);
}
Your code misses g_object_unref()s. You need to call
g_object_unref() when an object is no longer needed. If you
forget to call g_object_unref(), your program causes a
memory leak.
Thanks,
--
kou
In <de...@freenet.de>
"[C/GLib] Trying (and failing) to send RecordBatches between Client and Server in C" on Tue, 5 Jul 2022 17:04:33 +0200,
Joel Ziegler <co...@freenet.de> wrote:
> Hi folks,
>
> I read some data from a PostgreSQL database, convert it into
> RecordBatches and try to send the data to a client. But I fail to
> properly understand the usage of Apache Arrow C/GLib.
>
> My information sources are the [C++ docs][1], [the Apache Arrow C/GLib
> reference manual][2] and [the C/GLib Github files][3].
>
> By following the usage description of Apache Arrow C++ and
> experimenting with the wrapper classes in C, I build this minimal
> example of writing out a RecordBatch into a buffer and (after
> theoretically sending and receiving the buffer) trying to read that
> buffer back into a RecordBatch. But it fails and i would be glad, if
> you could point out my mistakes!
>
> I omitted the error catching for readability. The code errors out at
> creation of the GArrowRecordBatchStreamReader. If i use the
> arrowbuffer or the buffer from the top in creating the InputStream,
> the error reads:
>
> ```[record-batch-stream-reader][open]: IOError: Expected IPC message
> of type schema but got record batch```.
>
> If i use the testBuffer the error complains about an invalid IPC
> stream, so the data is just corrupt.
>
>
> ```
> void testRecordbatchStream(GArrowRecordBatch *rb){
> GError *error = NULL;
>
> // Write Recordbatch
> GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
> &error);
> GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
> long written =
> garrow_output_stream_write_record_batch(GARROW_OUTPUT_STREAM(bufferStream),
> rb, NULL, &error);
>
> // Use buffer as plain bytes
> void *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
> size_t length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
>
> // Read plain bytes and test serialize function
> GArrowBuffer *testBuffer = garrow_buffer_new(data, length);
> GArrowBuffer *arrowbuffer = garrow_record_batch_serialize(rb,
> NULL, &error);
>
> // Read RecordBatch from buffer
> GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(arrowbuffer);
> GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
> &error);
> GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
> &error);
>
>
> printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
> &error));
> }
> ```
>
>
> [1]: https://arrow.apache.org/docs/cpp/index.html
> [2]: https://arrow.apache.org/docs/c_glib/arrow-glib/
> [3]: https://github.com/apache/arrow/tree/master/c_glib
>