You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/16 03:46:36 UTC

arrow git commit: ARROW-1217: [GLib] Add GInputStream based arrow::io::RandomAccessFile

Repository: arrow
Updated Branches:
  refs/heads/master dc4216f02 -> e438e1517


ARROW-1217: [GLib] Add GInputStream based arrow::io::RandomAccessFile

`GInputStream` provides many input types such as local file, in memory data, socket including TLS support. It's useful to read data from many data sources.

Author: Kouhei Sutou <ko...@clear-code.com>

Closes #845 from kou/glib-support-ginput-stream and squashes the following commits:

59247ba7 [Kouhei Sutou] [GLib] Disable GSettings related codes in Go example
268a5fb3 [Kouhei Sutou] [GLib] Remove needless gdk-3.0 dependency in Go example
9e54e7b6 [Kouhei Sutou] [GLib] Update dependency in Go example
ec2ddaca [Kouhei Sutou] [GLib] Add GInputStream based arrow::io::RandomAccessFile


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e438e151
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e438e151
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e438e151

Branch: refs/heads/master
Commit: e438e15172b94b5bad80ba39d838195b4748d7c3
Parents: dc4216f
Author: Kouhei Sutou <ko...@clear-code.com>
Authored: Sat Jul 15 23:46:31 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 15 23:46:31 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/Makefile.am           |   7 +-
 c_glib/arrow-glib/input-stream.cpp      | 207 +++++++++++++++++++++++++++
 c_glib/arrow-glib/input-stream.h        |  52 +++++++
 c_glib/configure.ac                     |   2 +-
 c_glib/example/go/arrow-1.0/arrow.go.in |   1 +
 c_glib/test/run-test.rb                 |   1 +
 c_glib/test/test-gio-input-stream.rb    |  53 +++++++
 ci/travis_before_script_c_glib.sh       |   6 +
 8 files changed, 326 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/Makefile.am
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/Makefile.am b/c_glib/arrow-glib/Makefile.am
index f0bb601..f33a8d1 100644
--- a/c_glib/arrow-glib/Makefile.am
+++ b/c_glib/arrow-glib/Makefile.am
@@ -194,9 +194,12 @@ INTROSPECTION_COMPILER_ARGS =
 if HAVE_INTROSPECTION
 Arrow-1.0.gir: libarrow-glib.la
 Arrow_1_0_gir_PACKAGES =			\
-	gobject-2.0
+	gobject-2.0				\
+	gio-2.0
 Arrow_1_0_gir_EXPORT_PACKAGES = arrow
-Arrow_1_0_gir_INCLUDES = GObject-2.0
+Arrow_1_0_gir_INCLUDES =			\
+	GObject-2.0				\
+	Gio-2.0
 Arrow_1_0_gir_CFLAGS =				\
 	$(AM_CPPFLAGS)
 Arrow_1_0_gir_LIBS = libarrow-glib.la

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/input-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index d81e4a3..37ac2fa 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -32,6 +32,9 @@
 #include <arrow-glib/readable.hpp>
 #include <arrow-glib/tensor.hpp>
 
+#include <iostream>
+#include <sstream>
+
 G_BEGIN_DECLS
 
 /**
@@ -375,6 +378,210 @@ garrow_memory_mapped_input_stream_new(const gchar *path,
 
 G_END_DECLS
 
+namespace garrow {
+  class GIOInputStream : public arrow::io::RandomAccessFile {
+  public:
+    GIOInputStream(GInputStream *input_stream) :
+      input_stream_(input_stream) {
+      g_object_ref(input_stream_);
+    }
+
+    ~GIOInputStream() {
+      g_object_unref(input_stream_);
+    }
+
+    GInputStream *get_input_stream() {
+      return input_stream_;
+    }
+
+    arrow::Status Close() override {
+      GError *error = NULL;
+      if (g_input_stream_close(input_stream_, NULL, &error)) {
+        return arrow::Status::OK();
+      } else {
+        return io_error_to_status(error, "[gio-input-stream][close]");
+      }
+    }
+
+    arrow::Status Tell(int64_t *position) override {
+      if (!G_IS_SEEKABLE(input_stream_)) {
+        std::string message("[gio-input-stream][tell] "
+                            "not seekable input stream: <");
+        message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+        message += ">";
+        return arrow::Status::NotImplemented(message);
+      }
+
+      *position = g_seekable_tell(G_SEEKABLE(input_stream_));
+      return arrow::Status::OK();
+    }
+
+    arrow::Status Read(int64_t n_bytes,
+                       int64_t *n_read_bytes,
+                       uint8_t *out) override {
+      GError *error = NULL;
+      *n_read_bytes = g_input_stream_read(input_stream_,
+                                          out,
+                                          n_bytes,
+                                          NULL,
+                                          &error);
+      if (*n_read_bytes == -1) {
+        return io_error_to_status(error, "[gio-input-stream][read]");
+      } else {
+        return arrow::Status::OK();
+      }
+    }
+
+    arrow::Status Read(int64_t n_bytes,
+                       std::shared_ptr<arrow::Buffer> *out) override {
+      arrow::MemoryPool *pool = arrow::default_memory_pool();
+      std::shared_ptr<arrow::ResizableBuffer> buffer;
+      ARROW_RETURN_NOT_OK(AllocateResizableBuffer(pool, n_bytes, &buffer));
+
+      GError *error = NULL;
+      auto n_read_bytes = g_input_stream_read(input_stream_,
+                                              buffer->mutable_data(),
+                                              n_bytes,
+                                              NULL,
+                                              &error);
+      if (n_read_bytes == -1) {
+        return io_error_to_status(error, "[gio-input-stream][read][buffer]");
+      } else {
+        if (n_read_bytes < n_bytes) {
+          ARROW_RETURN_NOT_OK(buffer->Resize(n_read_bytes));
+        }
+        *out = buffer;
+        return arrow::Status::OK();
+      }
+    }
+
+    arrow::Status Seek(int64_t position) override {
+      if (!G_IS_SEEKABLE(input_stream_)) {
+        std::string message("[gio-input-stream][seek] "
+                            "not seekable input stream: <");
+        message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+        message += ">";
+        return arrow::Status::NotImplemented(message);
+      }
+
+      GError *error = NULL;
+      if (g_seekable_seek(G_SEEKABLE(input_stream_),
+                          position,
+                          G_SEEK_SET,
+                          NULL,
+                          &error)) {
+        return arrow::Status::OK();
+      } else {
+        return io_error_to_status(error, "[gio-input-stream][seek]");
+      }
+    }
+
+    arrow::Status GetSize(int64_t *size) override {
+      if (!G_IS_SEEKABLE(input_stream_)) {
+        std::string message("[gio-input-stream][size] "
+                            "not seekable input stream: <");
+        message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+        message += ">";
+        return arrow::Status::NotImplemented(message);
+      }
+
+      auto current_position = g_seekable_tell(G_SEEKABLE(input_stream_));
+      GError *error = NULL;
+      if (!g_seekable_seek(G_SEEKABLE(input_stream_),
+                           0,
+                           G_SEEK_END,
+                           NULL,
+                           &error)) {
+        return io_error_to_status(error, "[gio-input-stream][size][seek]");
+      }
+      *size = g_seekable_tell(G_SEEKABLE(input_stream_));
+      if (!g_seekable_seek(G_SEEKABLE(input_stream_),
+                           current_position,
+                           G_SEEK_SET,
+                           NULL,
+                           &error)) {
+        return io_error_to_status(error,
+                                  "[gio-input-stream][size][seek][restore]");
+      }
+      return arrow::Status::OK();
+    }
+
+    bool supports_zero_copy() const override {
+      return false;
+    }
+
+  private:
+    GInputStream *input_stream_;
+
+    arrow::Status io_error_to_status(GError *error, const char *context) {
+      std::stringstream message;
+      message << context << ": " << g_quark_to_string(error->domain);
+      message << "(" << error->code << "): ";
+      message << error->message;
+      g_error_free(error);
+      return arrow::Status::IOError(message.str());
+    }
+  };
+};
+
+G_BEGIN_DECLS
+
+G_DEFINE_TYPE(GArrowGIOInputStream,                \
+              garrow_gio_input_stream,             \
+              GARROW_TYPE_SEEKABLE_INPUT_STREAM);
+
+static void
+garrow_gio_input_stream_init(GArrowGIOInputStream *object)
+{
+}
+
+static void
+garrow_gio_input_stream_class_init(GArrowGIOInputStreamClass *klass)
+{
+}
+
+/**
+ * garrow_gio_input_stream_new:
+ * @gio_input_stream: The stream to be read.
+ *
+ * Returns: A newly created #GArrowGIOInputStream.
+ *
+ * Since: 0.5.0
+ */
+GArrowGIOInputStream *
+garrow_gio_input_stream_new(GInputStream *gio_input_stream)
+{
+  auto arrow_input_stream =
+    std::make_shared<garrow::GIOInputStream>(gio_input_stream);
+  auto object = g_object_new(GARROW_TYPE_GIO_INPUT_STREAM,
+                             "input-stream", &arrow_input_stream,
+                             NULL);
+  auto input_stream = GARROW_GIO_INPUT_STREAM(object);
+  return input_stream;
+}
+
+/**
+ * garrow_gio_input_stream_get_gio_input_stream:
+ * @input_stream: A #GArrowGIOInputStream.
+ *
+ * Returns: (transfer none): The wrapped #GInputStream.
+ *
+ * Since: 0.5.0
+ */
+GInputStream *
+garrow_gio_input_stream_get_gio_input_stream(GArrowGIOInputStream *input_stream)
+{
+  auto arrow_input_stream =
+    garrow_input_stream_get_raw(GARROW_INPUT_STREAM(input_stream));
+  auto arrow_gio_input_stream =
+    std::static_pointer_cast<garrow::GIOInputStream>(arrow_input_stream);
+  auto gio_input_stream = arrow_gio_input_stream->get_input_stream();
+  return gio_input_stream;
+}
+
+
+G_END_DECLS
+
 GArrowInputStream *
 garrow_input_stream_new_raw(std::shared_ptr<arrow::io::InputStream> *arrow_input_stream)
 {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/input-stream.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h
index 8a4d362..120839d 100644
--- a/c_glib/arrow-glib/input-stream.h
+++ b/c_glib/arrow-glib/input-stream.h
@@ -19,6 +19,8 @@
 
 #pragma once
 
+#include <gio/gio.h>
+
 #include <arrow-glib/buffer.h>
 #include <arrow-glib/tensor.h>
 
@@ -229,4 +231,54 @@ GType garrow_memory_mapped_input_stream_get_type(void) G_GNUC_CONST;
 GArrowMemoryMappedInputStream *garrow_memory_mapped_input_stream_new(const gchar *path,
                                                                      GError **error);
 
+
+#define GARROW_TYPE_GIO_INPUT_STREAM            \
+  (garrow_gio_input_stream_get_type())
+#define GARROW_GIO_INPUT_STREAM(obj)                            \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),                            \
+                              GARROW_TYPE_GIO_INPUT_STREAM,     \
+                              GArrowGIOInputStream))
+#define GARROW_GIO_INPUT_STREAM_CLASS(klass)                    \
+  (G_TYPE_CHECK_CLASS_CAST((klass),                             \
+                           GARROW_TYPE_GIO_INPUT_STREAM,        \
+                           GArrowGIOInputStreamClass))
+#define GARROW_IS_GIO_INPUT_STREAM(obj)                         \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),                            \
+                              GARROW_TYPE_GIO_INPUT_STREAM))
+#define GARROW_IS_GIO_INPUT_STREAM_CLASS(klass)                 \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),                             \
+                           GARROW_TYPE_GIO_INPUT_STREAM))
+#define GARROW_GIO_INPUT_STREAM_GET_CLASS(obj)                  \
+  (G_TYPE_INSTANCE_GET_CLASS((obj),                             \
+                             GARROW_TYPE_GIO_INPUT_STREAM,      \
+                             GArrowGIOInputStreamClass))
+
+typedef struct _GArrowGIOInputStream         GArrowGIOInputStream;
+#ifndef __GTK_DOC_IGNORE__
+typedef struct _GArrowGIOInputStreamClass    GArrowGIOInputStreamClass;
+#endif
+
+/**
+ * GArrowGIOInputStream:
+ *
+ * It's an input stream for `GInputStream`.
+ */
+struct _GArrowGIOInputStream
+{
+  /*< private >*/
+  GArrowSeekableInputStream parent_instance;
+};
+
+#ifndef __GTK_DOC_IGNORE__
+struct _GArrowGIOInputStreamClass
+{
+  GArrowSeekableInputStreamClass parent_class;
+};
+#endif
+
+GType garrow_gio_input_stream_get_type(void) G_GNUC_CONST;
+
+GArrowGIOInputStream *garrow_gio_input_stream_new(GInputStream *gio_input_stream);
+GInputStream *garrow_gio_input_stream_get_gio_input_stream(GArrowGIOInputStream *input_stream);
+
 G_END_DECLS

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/configure.ac
----------------------------------------------------------------------
diff --git a/c_glib/configure.ac b/c_glib/configure.ac
index e010d96..d4e828b 100644
--- a/c_glib/configure.ac
+++ b/c_glib/configure.ac
@@ -56,7 +56,7 @@ fi
 AC_SUBST(GARROW_CFLAGS)
 AC_SUBST(GARROW_CXXFLAGS)
 
-AM_PATH_GLIB_2_0([2.32.4], [], [], [gobject])
+AM_PATH_GLIB_2_0([2.32.4], [], [], [gobject gio])
 
 GOBJECT_INTROSPECTION_REQUIRE([1.32.1])
 GTK_DOC_CHECK([1.18-2])

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/example/go/arrow-1.0/arrow.go.in
----------------------------------------------------------------------
diff --git a/c_glib/example/go/arrow-1.0/arrow.go.in b/c_glib/example/go/arrow-1.0/arrow.go.in
index bd124af..08ee13d 100644
--- a/c_glib/example/go/arrow-1.0/arrow.go.in
+++ b/c_glib/example/go/arrow-1.0/arrow.go.in
@@ -31,6 +31,7 @@ import "unsafe"
 import (
 	"gir/glib-2.0"
 	"gir/gobject-2.0"
+	"gir/gio-2.0"
 )
 
 [<.go_utils>]

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/test/run-test.rb
----------------------------------------------------------------------
diff --git a/c_glib/test/run-test.rb b/c_glib/test/run-test.rb
index 43bb220..75ff34f 100755
--- a/c_glib/test/run-test.rb
+++ b/c_glib/test/run-test.rb
@@ -31,6 +31,7 @@ ENV["GI_TYPELIB_PATH"] = [
 
 require "gi"
 
+Gio = GI.load("Gio")
 Arrow = GI.load("Arrow")
 module Arrow
   class Buffer

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/test/test-gio-input-stream.rb
----------------------------------------------------------------------
diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb
new file mode 100644
index 0000000..baa978e
--- /dev/null
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class TestGIOInputStream < Test::Unit::TestCase
+  def test_reader_backend
+    tempfile = Tempfile.open("arrow-gio-input-stream")
+    output = Arrow::FileOutputStream.new(tempfile.path, false)
+    begin
+      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+      schema = Arrow::Schema.new([field])
+      file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
+      begin
+        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        file_writer.write_record_batch(record_batch)
+      ensure
+        file_writer.close
+      end
+    ensure
+      output.close
+    end
+
+    file = Gio::File.new_for_path(tempfile.path)
+    input_stream = file.read
+    input = Arrow::GIOInputStream.new(input_stream)
+    begin
+      file_reader = Arrow::RecordBatchFileReader.new(input)
+      assert_equal(["enabled"],
+                   file_reader.schema.fields.collect(&:name))
+    ensure
+      input.close
+    end
+  end
+
+  def test_getter
+    input_stream = Gio::MemoryInputStream.new("Hello")
+    input = Arrow::GIOInputStream.new(input_stream)
+    assert_equal(input_stream, input.gio_input_stream)
+  end
+end

http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/ci/travis_before_script_c_glib.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_c_glib.sh b/ci/travis_before_script_c_glib.sh
index 6387f4d..6547ea4 100755
--- a/ci/travis_before_script_c_glib.sh
+++ b/ci/travis_before_script_c_glib.sh
@@ -44,6 +44,12 @@ luarocks install lgi
 
 go get github.com/linuxdeepin/go-gir-generator || :
 pushd $GOPATH/src/github.com/linuxdeepin/go-gir-generator
+rm lib.in/gio-2.0/gdk_workaround.go
+mv lib.in/gio-2.0/config.json{,.orig}
+sed \
+  -e 's/\("Settings",\)/\/\/ \1/g' \
+  -e 's/\("SettingsBackend",\)/\/\/ \1/g' \
+  lib.in/gio-2.0/config.json.orig > lib.in/gio-2.0/config.json
 mv Makefile{,.orig}
 sed -e 's/ gudev-1.0//' Makefile.orig > Makefile
 mkdir -p out/src/gir/gudev-1.0