You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/07/13 04:53:34 UTC
[arrow] branch master updated: ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1649a528 ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
8a1649a528 is described below
commit 8a1649a528b2e940dd9183ca666fd44b514bbde1
Author: Sutou Kouhei <ko...@clear-code.com>
AuthorDate: Wed Jul 13 13:53:25 2022 +0900
ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
Authored-by: Sutou Kouhei <ko...@clear-code.com>
Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
c_glib/arrow-glib/field.cpp | 2 +-
c_glib/example/meson.build | 8 ++
c_glib/example/receive-network.c | 212 +++++++++++++++++++++++++++++++++++++++
c_glib/example/send-network.c | 187 ++++++++++++++++++++++++++++++++++
4 files changed, 408 insertions(+), 1 deletion(-)
diff --git a/c_glib/arrow-glib/field.cpp b/c_glib/arrow-glib/field.cpp
index 1bb2dd1812..526f9a6773 100644
--- a/c_glib/arrow-glib/field.cpp
+++ b/c_glib/arrow-glib/field.cpp
@@ -160,7 +160,7 @@ garrow_field_import(gpointer c_abi_schema, GError **error)
/**
* garrow_field_new:
* @name: The name of the field.
- * @data_type: The data type of the field.
+ * @data_type: (transfer full): The data type of the field.
*
* Returns: A newly created #GArrowField.
*/
diff --git a/c_glib/example/meson.build b/c_glib/example/meson.build
index e43f1553dd..e2d55d4788 100644
--- a/c_glib/example/meson.build
+++ b/c_glib/example/meson.build
@@ -29,12 +29,20 @@ executable('read-file', 'read-file.c',
executable('read-stream', 'read-stream.c',
dependencies: [arrow_glib],
link_language: 'c')
+executable('receive-network', 'receive-network.c',
+ dependencies: [arrow_glib],
+ link_language: 'c')
+executable('send-network', 'send-network.c',
+ dependencies: [arrow_glib],
+ link_language: 'c')
install_data('README.md',
'build.c',
'extension-type.c',
'read-file.c',
'read-stream.c',
+ 'receive-network.c',
+ 'send-network.c',
install_dir: join_paths(data_dir, meson.project_name(), 'example'))
subdir('lua')
diff --git a/c_glib/example/receive-network.c b/c_glib/example/receive-network.c
new file mode 100644
index 0000000000..aa7aaa0140
--- /dev/null
+++ b/c_glib/example/receive-network.c
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include <arrow-glib/arrow-glib.h>
+
+#ifdef G_OS_UNIX
+# include <glib-unix.h>
+# include <signal.h>
+#endif
+
+static void
+service_event(GSocketListener *listener,
+ GSocketListenerEvent event,
+ GSocket *socket,
+ gpointer user_data)
+{
+ if (event != G_SOCKET_LISTENER_BOUND) {
+ return;
+ }
+
+ GError *error = NULL;
+ GSocketAddress* local_address = g_socket_get_local_address(socket, &error);
+ if (!local_address) {
+ g_print("failed to get local address: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(socket);
+ return;
+ }
+ gchar *local_address_string =
+ g_socket_connectable_to_string(G_SOCKET_CONNECTABLE(local_address));
+ g_print("address: %s\n", local_address_string);
+ g_free(local_address_string);
+ g_object_unref(local_address);
+}
+
+static void
+print_array(GArrowArray *array)
+{
+ GArrowType value_type;
+ gint64 i, n;
+
+ value_type = garrow_array_get_value_type(array);
+
+ g_print("[");
+ n = garrow_array_get_length(array);
+
+#define ARRAY_CASE(type, Type, TYPE, format) \
+ case GARROW_TYPE_ ## TYPE: \
+ { \
+ GArrow ## Type ## Array *real_array; \
+ real_array = GARROW_ ## TYPE ## _ARRAY(array); \
+ for (i = 0; i < n; i++) { \
+ if (i > 0) { \
+ g_print(", "); \
+ } \
+ g_print(format, \
+ garrow_ ## type ## _array_get_value(real_array, i)); \
+ } \
+ } \
+ break
+
+ switch (value_type) {
+ ARRAY_CASE(uint8, UInt8, UINT8, "%hhu");
+ ARRAY_CASE(uint16, UInt16, UINT16, "%" G_GUINT16_FORMAT);
+ ARRAY_CASE(uint32, UInt32, UINT32, "%" G_GUINT32_FORMAT);
+ ARRAY_CASE(uint64, UInt64, UINT64, "%" G_GUINT64_FORMAT);
+ ARRAY_CASE( int8, Int8, INT8, "%hhd");
+ ARRAY_CASE( int16, Int16, INT16, "%" G_GINT16_FORMAT);
+ ARRAY_CASE( int32, Int32, INT32, "%" G_GINT32_FORMAT);
+ ARRAY_CASE( int64, Int64, INT64, "%" G_GINT64_FORMAT);
+ ARRAY_CASE( float, Float, FLOAT, "%g");
+ ARRAY_CASE(double, Double, DOUBLE, "%g");
+ default:
+ break;
+ }
+#undef ARRAY_CASE
+
+ g_print("]\n");
+}
+
+static void
+print_record_batch(GArrowRecordBatch *record_batch)
+{
+ guint nth_column, n_columns;
+
+ n_columns = garrow_record_batch_get_n_columns(record_batch);
+ for (nth_column = 0; nth_column < n_columns; nth_column++) {
+ GArrowArray *array;
+
+ g_print("columns[%u](%s): ",
+ nth_column,
+ garrow_record_batch_get_column_name(record_batch, nth_column));
+ array = garrow_record_batch_get_column_data(record_batch, nth_column);
+ print_array(array);
+ g_object_unref(array);
+ }
+}
+
+static gboolean
+service_incoming(GSocketService *service,
+ GSocketConnection *connection,
+ GObject *source_object,
+ gpointer user_data)
+{
+ GArrowGIOInputStream *input =
+ garrow_gio_input_stream_new(
+ g_io_stream_get_input_stream(G_IO_STREAM(connection)));
+ GError *error = NULL;
+ GArrowRecordBatchStreamReader *reader =
+ garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(input), &error);
+ if (!reader) {
+ g_print("failed to create reader: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(input);
+ return FALSE;
+ }
+
+ while (TRUE) {
+ GArrowRecordBatch *record_batch =
+ garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(reader),
+ &error);
+ if (error) {
+ g_print("failed to read the next record batch: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(reader);
+ g_object_unref(input);
+ return EXIT_FAILURE;
+ }
+
+ if (!record_batch) {
+ break;
+ }
+
+ print_record_batch(record_batch);
+ g_object_unref(record_batch);
+ }
+
+ g_object_unref(reader);
+ g_object_unref(input);
+
+ return FALSE;
+}
+
+#ifdef G_OS_UNIX
+typedef struct {
+ GSocketService *service;
+ GMainLoop *loop;
+} StopData;
+
+static gboolean
+stop(gpointer user_data)
+{
+ StopData* data = user_data;
+ g_object_unref(data->service);
+ g_main_loop_quit(data->loop);
+ return G_SOURCE_REMOVE;
+}
+#endif
+
+int
+main(int argc, char **argv)
+{
+ GSocketService *service = g_threaded_socket_service_new(-1);
+ g_signal_connect(service, "event", G_CALLBACK(service_event), NULL);
+ g_signal_connect(service, "incoming", G_CALLBACK(service_incoming), NULL);
+
+ GError *error = NULL;
+ gboolean success =
+ g_socket_listener_add_any_inet_port(G_SOCKET_LISTENER(service),
+ NULL,
+ &error);
+ if (!success) {
+ g_print("failed to add a listen IP address: %s\n", error->message);
+ g_error_free(error);
+ return EXIT_FAILURE;
+ }
+
+ g_socket_service_start(service);
+
+ GMainLoop *loop = g_main_loop_new(NULL, FALSE);
+#ifdef G_OS_UNIX
+ StopData data;
+ data.service = service;
+ data.loop = loop;
+ g_unix_signal_add(SIGINT, stop, &data);
+ g_unix_signal_add(SIGTERM, stop, &data);
+#else
+ /* TODO: Implement graceful stop. */
+#endif
+ g_main_loop_run(loop);
+ g_main_loop_unref(loop);
+
+ return EXIT_SUCCESS;
+}
diff --git a/c_glib/example/send-network.c b/c_glib/example/send-network.c
new file mode 100644
index 0000000000..d298c5a173
--- /dev/null
+++ b/c_glib/example/send-network.c
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include <arrow-glib/arrow-glib.h>
+
+static GArrowSchema *
+build_schema(void)
+{
+ GList *fields = NULL;
+ GArrowBooleanDataType *boolean_data_type = garrow_boolean_data_type_new();
+ fields = g_list_append(fields,
+ garrow_field_new("boolean",
+ GARROW_DATA_TYPE(boolean_data_type)));
+ GArrowInt32DataType *int32_data_type = garrow_int32_data_type_new();
+ fields = g_list_append(fields,
+ garrow_field_new("int32",
+ GARROW_DATA_TYPE(int32_data_type)));
+ GArrowSchema *schema = garrow_schema_new(fields);
+ g_list_free_full(fields, g_object_unref);
+
+ return schema;
+}
+
+static GArrowRecordBatch *
+build_record_batch(void)
+{
+ GArrowSchema *schema = build_schema();
+ if (!schema) {
+ return NULL;
+ }
+ GError *error = NULL;
+ GArrowRecordBatchBuilder *builder =
+ garrow_record_batch_builder_new(schema, &error);
+ g_object_unref(schema);
+ if (!builder) {
+ g_print("failed to build record batch builder: %s\n", error->message);
+ g_error_free(error);
+ return NULL;
+ }
+
+ const gint64 n_records = 3;
+ GArrowBooleanArrayBuilder *boolean_builder =
+ GARROW_BOOLEAN_ARRAY_BUILDER(
+ garrow_record_batch_builder_get_column_builder(builder, 0));
+ gboolean boolean_values[] = {TRUE, TRUE, FALSE};
+ gboolean boolean_is_valids[] = {TRUE, FALSE, TRUE};
+ if (!garrow_boolean_array_builder_append_values(boolean_builder,
+ boolean_values,
+ n_records,
+ boolean_is_valids,
+ n_records,
+ &error)) {
+ g_print("failed to append boolean values: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(boolean_builder);
+ g_object_unref(builder);
+ return NULL;
+ }
+
+ GArrowInt32ArrayBuilder *int32_builder =
+ GARROW_INT32_ARRAY_BUILDER(
+ garrow_record_batch_builder_get_column_builder(builder, 1));
+ gint32 int32_values[] = {1, 11, 111};
+ gint32 int32_is_valids[] = {FALSE, TRUE, TRUE};
+ if (!garrow_int32_array_builder_append_values(int32_builder,
+ int32_values,
+ n_records,
+ int32_is_valids,
+ n_records,
+ &error)) {
+ g_print("failed to append int32 values: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(int32_builder);
+ g_object_unref(builder);
+ return NULL;
+ }
+
+ GArrowRecordBatch *record_batch =
+ garrow_record_batch_builder_flush(builder, &error);
+ if (!record_batch) {
+ g_print("failed to build record batch: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(builder);
+ return NULL;
+ }
+
+ g_object_unref(builder);
+
+ return record_batch;
+}
+
+int
+main(int argc, char **argv)
+{
+ if (argc != 2) {
+ g_print("Usage: %s PORT\n", argv[0]);
+ g_print(" e.g.: %s 2929\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+
+ guint port = atoi(argv[1]);
+
+ GSocketClient *client = g_socket_client_new();
+ GSocketAddress *address = g_inet_socket_address_new_from_string("127.0.0.1",
+ port);
+ GError *error = NULL;
+ GSocketConnection *connection =
+ g_socket_client_connect(client, G_SOCKET_CONNECTABLE(address), NULL, &error);
+ if (!connection) {
+ g_print("failed to connect: %s\n", error->message);
+ g_error_free(error);
+ return EXIT_FAILURE;
+ }
+
+ GArrowSchema *schema = build_schema();
+ if (!schema) {
+ return EXIT_FAILURE;
+ }
+ GArrowGIOOutputStream *output =
+ garrow_gio_output_stream_new(
+ g_io_stream_get_output_stream(G_IO_STREAM(connection)));
+ GArrowRecordBatchStreamWriter *writer =
+ garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(output),
+ schema,
+ &error);
+ g_object_unref(schema);
+ if (!writer) {
+ g_print("failed to create writer: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(output);
+ g_object_unref(connection);
+ g_object_unref(client);
+ return EXIT_FAILURE;
+ }
+
+ gsize n_record_batches = 5;
+ gsize i;
+ for (i = 0; i < n_record_batches; i++) {
+ GArrowRecordBatch *record_batch = build_record_batch();
+ if (!record_batch) {
+ g_object_unref(writer);
+ g_object_unref(output);
+ g_object_unref(connection);
+ g_object_unref(client);
+ return EXIT_FAILURE;
+ }
+ gboolean success =
+ garrow_record_batch_writer_write_record_batch(
+ GARROW_RECORD_BATCH_WRITER(writer),
+ record_batch,
+ &error);
+ g_object_unref(record_batch);
+ if (!success) {
+ g_print("failed to write record batch: %s\n", error->message);
+ g_error_free(error);
+ g_object_unref(output);
+ g_object_unref(connection);
+ g_object_unref(client);
+ return EXIT_FAILURE;
+ }
+ }
+
+ g_object_unref(writer);
+ g_object_unref(output);
+ g_object_unref(connection);
+ g_object_unref(client);
+
+ return EXIT_SUCCESS;
+}