You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/07/13 04:53:34 UTC

[arrow] branch master updated: ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a1649a528 ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
8a1649a528 is described below

commit 8a1649a528b2e940dd9183ca666fd44b514bbde1
Author: Sutou Kouhei <ko...@clear-code.com>
AuthorDate: Wed Jul 13 13:53:25 2022 +0900

    ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
    
    Authored-by: Sutou Kouhei <ko...@clear-code.com>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 c_glib/arrow-glib/field.cpp      |   2 +-
 c_glib/example/meson.build       |   8 ++
 c_glib/example/receive-network.c | 212 +++++++++++++++++++++++++++++++++++++++
 c_glib/example/send-network.c    | 187 ++++++++++++++++++++++++++++++++++
 4 files changed, 408 insertions(+), 1 deletion(-)

diff --git a/c_glib/arrow-glib/field.cpp b/c_glib/arrow-glib/field.cpp
index 1bb2dd1812..526f9a6773 100644
--- a/c_glib/arrow-glib/field.cpp
+++ b/c_glib/arrow-glib/field.cpp
@@ -160,7 +160,7 @@ garrow_field_import(gpointer c_abi_schema, GError **error)
 /**
  * garrow_field_new:
  * @name: The name of the field.
- * @data_type: The data type of the field.
+ * @data_type: (transfer full): The data type of the field.
  *
  * Returns: A newly created #GArrowField.
  */
diff --git a/c_glib/example/meson.build b/c_glib/example/meson.build
index e43f1553dd..e2d55d4788 100644
--- a/c_glib/example/meson.build
+++ b/c_glib/example/meson.build
@@ -29,12 +29,20 @@ executable('read-file', 'read-file.c',
 executable('read-stream', 'read-stream.c',
            dependencies: [arrow_glib],
            link_language: 'c')
+executable('receive-network', 'receive-network.c',
+           dependencies: [arrow_glib],
+           link_language: 'c')
+executable('send-network', 'send-network.c',
+           dependencies: [arrow_glib],
+           link_language: 'c')
 
 install_data('README.md',
              'build.c',
              'extension-type.c',
              'read-file.c',
              'read-stream.c',
+             'receive-network.c',
+             'send-network.c',
              install_dir: join_paths(data_dir, meson.project_name(), 'example'))
 
 subdir('lua')
diff --git a/c_glib/example/receive-network.c b/c_glib/example/receive-network.c
new file mode 100644
index 0000000000..aa7aaa0140
--- /dev/null
+++ b/c_glib/example/receive-network.c
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include <arrow-glib/arrow-glib.h>
+
+#ifdef G_OS_UNIX
+#  include <glib-unix.h>
+#  include <signal.h>
+#endif
+
+static void
+service_event(GSocketListener *listener,
+              GSocketListenerEvent event,
+              GSocket *socket,
+              gpointer user_data)
+{
+  if (event != G_SOCKET_LISTENER_BOUND) {
+    return;
+  }
+
+  GError *error = NULL;
+  GSocketAddress* local_address = g_socket_get_local_address(socket, &error);
+  if (!local_address) {
+    g_print("failed to get local address: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(socket);
+    return;
+  }
+  gchar *local_address_string =
+    g_socket_connectable_to_string(G_SOCKET_CONNECTABLE(local_address));
+  g_print("address: %s\n", local_address_string);
+  g_free(local_address_string);
+  g_object_unref(local_address);
+}
+
+static void
+print_array(GArrowArray *array)
+{
+  GArrowType value_type;
+  gint64 i, n;
+
+  value_type = garrow_array_get_value_type(array);
+
+  g_print("[");
+  n = garrow_array_get_length(array);
+
+#define ARRAY_CASE(type, Type, TYPE, format)                            \
+  case GARROW_TYPE_ ## TYPE:                                            \
+    {                                                                   \
+      GArrow ## Type ## Array *real_array;                              \
+      real_array = GARROW_ ## TYPE ## _ARRAY(array);                    \
+      for (i = 0; i < n; i++) {                                         \
+        if (i > 0) {                                                    \
+          g_print(", ");                                                \
+        }                                                               \
+        g_print(format,                                                 \
+                garrow_ ## type ## _array_get_value(real_array, i));    \
+      }                                                                 \
+    }                                                                   \
+    break
+
+  switch (value_type) {
+    ARRAY_CASE(uint8,  UInt8,  UINT8,  "%hhu");
+    ARRAY_CASE(uint16, UInt16, UINT16, "%" G_GUINT16_FORMAT);
+    ARRAY_CASE(uint32, UInt32, UINT32, "%" G_GUINT32_FORMAT);
+    ARRAY_CASE(uint64, UInt64, UINT64, "%" G_GUINT64_FORMAT);
+    ARRAY_CASE( int8,   Int8,   INT8,  "%hhd");
+    ARRAY_CASE( int16,  Int16,  INT16, "%" G_GINT16_FORMAT);
+    ARRAY_CASE( int32,  Int32,  INT32, "%" G_GINT32_FORMAT);
+    ARRAY_CASE( int64,  Int64,  INT64, "%" G_GINT64_FORMAT);
+    ARRAY_CASE( float,  Float,  FLOAT, "%g");
+    ARRAY_CASE(double, Double, DOUBLE, "%g");
+  default:
+    break;
+  }
+#undef ARRAY_CASE
+
+  g_print("]\n");
+}
+
+static void
+print_record_batch(GArrowRecordBatch *record_batch)
+{
+  guint nth_column, n_columns;
+
+  n_columns = garrow_record_batch_get_n_columns(record_batch);
+  for (nth_column = 0; nth_column < n_columns; nth_column++) {
+    GArrowArray *array;
+
+    g_print("columns[%u](%s): ",
+            nth_column,
+            garrow_record_batch_get_column_name(record_batch, nth_column));
+    array = garrow_record_batch_get_column_data(record_batch, nth_column);
+    print_array(array);
+    g_object_unref(array);
+  }
+}
+
+static gboolean
+service_incoming(GSocketService *service,
+                 GSocketConnection *connection,
+                 GObject *source_object,
+                 gpointer user_data)
+{
+  GArrowGIOInputStream *input =
+    garrow_gio_input_stream_new(
+      g_io_stream_get_input_stream(G_IO_STREAM(connection)));
+  GError *error = NULL;
+  GArrowRecordBatchStreamReader *reader =
+    garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(input), &error);
+  if (!reader) {
+    g_print("failed to create reader: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(input);
+    return FALSE;
+  }
+
+  while (TRUE) {
+    GArrowRecordBatch *record_batch =
+      garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(reader),
+                                           &error);
+    if (error) {
+      g_print("failed to read the next record batch: %s\n", error->message);
+      g_error_free(error);
+      g_object_unref(reader);
+      g_object_unref(input);
+      return EXIT_FAILURE;
+    }
+
+    if (!record_batch) {
+      break;
+    }
+
+    print_record_batch(record_batch);
+    g_object_unref(record_batch);
+  }
+
+  g_object_unref(reader);
+  g_object_unref(input);
+
+  return FALSE;
+}
+
+#ifdef G_OS_UNIX
+typedef struct {
+  GSocketService *service;
+  GMainLoop *loop;
+} StopData;
+
+static gboolean
+stop(gpointer user_data)
+{
+  StopData* data = user_data;
+  g_object_unref(data->service);
+  g_main_loop_quit(data->loop);
+  return G_SOURCE_REMOVE;
+}
+#endif
+
+int
+main(int argc, char **argv)
+{
+  GSocketService *service = g_threaded_socket_service_new(-1);
+  g_signal_connect(service, "event", G_CALLBACK(service_event), NULL);
+  g_signal_connect(service, "incoming", G_CALLBACK(service_incoming), NULL);
+
+  GError *error = NULL;
+  gboolean success =
+    g_socket_listener_add_any_inet_port(G_SOCKET_LISTENER(service),
+                                        NULL,
+                                        &error);
+  if (!success) {
+    g_print("failed to add a listen IP address: %s\n", error->message);
+    g_error_free(error);
+    return EXIT_FAILURE;
+  }
+
+  g_socket_service_start(service);
+
+  GMainLoop *loop = g_main_loop_new(NULL, FALSE);
+#ifdef G_OS_UNIX
+  StopData data;
+  data.service = service;
+  data.loop = loop;
+  g_unix_signal_add(SIGINT, stop, &data);
+  g_unix_signal_add(SIGTERM, stop, &data);
+#else
+  /* TODO: Implement graceful stop. */
+#endif
+  g_main_loop_run(loop);
+  g_main_loop_unref(loop);
+
+  return EXIT_SUCCESS;
+}
diff --git a/c_glib/example/send-network.c b/c_glib/example/send-network.c
new file mode 100644
index 0000000000..d298c5a173
--- /dev/null
+++ b/c_glib/example/send-network.c
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include <arrow-glib/arrow-glib.h>
+
+static GArrowSchema *
+build_schema(void)
+{
+  GList *fields = NULL;
+  GArrowBooleanDataType *boolean_data_type = garrow_boolean_data_type_new();
+  fields = g_list_append(fields,
+                         garrow_field_new("boolean",
+                                          GARROW_DATA_TYPE(boolean_data_type)));
+  GArrowInt32DataType *int32_data_type = garrow_int32_data_type_new();
+  fields = g_list_append(fields,
+                         garrow_field_new("int32",
+                                          GARROW_DATA_TYPE(int32_data_type)));
+  GArrowSchema *schema = garrow_schema_new(fields);
+  g_list_free_full(fields, g_object_unref);
+
+  return schema;
+}
+
+static GArrowRecordBatch *
+build_record_batch(void)
+{
+  GArrowSchema *schema = build_schema();
+  if (!schema) {
+    return NULL;
+  }
+  GError *error = NULL;
+  GArrowRecordBatchBuilder *builder =
+    garrow_record_batch_builder_new(schema, &error);
+  g_object_unref(schema);
+  if (!builder) {
+    g_print("failed to build record batch builder: %s\n", error->message);
+    g_error_free(error);
+    return NULL;
+  }
+
+  const gint64 n_records = 3;
+  GArrowBooleanArrayBuilder *boolean_builder =
+    GARROW_BOOLEAN_ARRAY_BUILDER(
+      garrow_record_batch_builder_get_column_builder(builder, 0));
+  gboolean boolean_values[] = {TRUE, TRUE, FALSE};
+  gboolean boolean_is_valids[] = {TRUE, FALSE, TRUE};
+  if (!garrow_boolean_array_builder_append_values(boolean_builder,
+                                                  boolean_values,
+                                                  n_records,
+                                                  boolean_is_valids,
+                                                  n_records,
+                                                  &error)) {
+    g_print("failed to append boolean values: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(boolean_builder);
+    g_object_unref(builder);
+    return NULL;
+  }
+
+  GArrowInt32ArrayBuilder *int32_builder =
+    GARROW_INT32_ARRAY_BUILDER(
+      garrow_record_batch_builder_get_column_builder(builder, 1));
+  gint32 int32_values[] = {1, 11, 111};
+  gint32 int32_is_valids[] = {FALSE, TRUE, TRUE};
+  if (!garrow_int32_array_builder_append_values(int32_builder,
+                                                int32_values,
+                                                n_records,
+                                                int32_is_valids,
+                                                n_records,
+                                                &error)) {
+    g_print("failed to append int32 values: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(int32_builder);
+    g_object_unref(builder);
+    return NULL;
+  }
+
+  GArrowRecordBatch *record_batch =
+    garrow_record_batch_builder_flush(builder, &error);
+  if (!record_batch) {
+    g_print("failed to build record batch: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(builder);
+    return NULL;
+  }
+
+  g_object_unref(builder);
+
+  return record_batch;
+}
+
+int
+main(int argc, char **argv)
+{
+  if (argc != 2) {
+    g_print("Usage: %s PORT\n", argv[0]);
+    g_print(" e.g.: %s 2929\n", argv[0]);
+    return EXIT_FAILURE;
+  }
+
+  guint port = atoi(argv[1]);
+
+  GSocketClient *client = g_socket_client_new();
+  GSocketAddress *address = g_inet_socket_address_new_from_string("127.0.0.1",
+                                                                  port);
+  GError *error = NULL;
+  GSocketConnection *connection =
+    g_socket_client_connect(client, G_SOCKET_CONNECTABLE(address), NULL, &error);
+  if (!connection) {
+    g_print("failed to connect: %s\n", error->message);
+    g_error_free(error);
+    return EXIT_FAILURE;
+  }
+
+  GArrowSchema *schema = build_schema();
+  if (!schema) {
+    return EXIT_FAILURE;
+  }
+  GArrowGIOOutputStream *output =
+    garrow_gio_output_stream_new(
+      g_io_stream_get_output_stream(G_IO_STREAM(connection)));
+  GArrowRecordBatchStreamWriter *writer =
+    garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(output),
+                                          schema,
+                                          &error);
+  g_object_unref(schema);
+  if (!writer) {
+    g_print("failed to create writer: %s\n", error->message);
+    g_error_free(error);
+    g_object_unref(output);
+    g_object_unref(connection);
+    g_object_unref(client);
+    return EXIT_FAILURE;
+  }
+
+  gsize n_record_batches = 5;
+  gsize i;
+  for (i = 0; i < n_record_batches; i++) {
+    GArrowRecordBatch *record_batch = build_record_batch();
+    if (!record_batch) {
+      g_object_unref(writer);
+      g_object_unref(output);
+      g_object_unref(connection);
+      g_object_unref(client);
+      return EXIT_FAILURE;
+    }
+    gboolean success =
+      garrow_record_batch_writer_write_record_batch(
+        GARROW_RECORD_BATCH_WRITER(writer),
+        record_batch,
+        &error);
+    g_object_unref(record_batch);
+    if (!success) {
+      g_print("failed to write record batch: %s\n", error->message);
+      g_error_free(error);
+      g_object_unref(output);
+      g_object_unref(connection);
+      g_object_unref(client);
+      return EXIT_FAILURE;
+    }
+  }
+
+  g_object_unref(writer);
+  g_object_unref(output);
+  g_object_unref(connection);
+  g_object_unref(client);
+
+  return EXIT_SUCCESS;
+}