You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/16 03:46:36 UTC
arrow git commit: ARROW-1217: [GLib] Add GInputStream based
arrow::io::RandomAccessFile
Repository: arrow
Updated Branches:
refs/heads/master dc4216f02 -> e438e1517
ARROW-1217: [GLib] Add GInputStream based arrow::io::RandomAccessFile
`GInputStream` provides many input types such as local file, in memory data, socket including TLS support. It's useful to read data from many data sources.
Author: Kouhei Sutou <ko...@clear-code.com>
Closes #845 from kou/glib-support-ginput-stream and squashes the following commits:
59247ba7 [Kouhei Sutou] [GLib] Disable GSettings related codes in Go example
268a5fb3 [Kouhei Sutou] [GLib] Remove needless gdk-3.0 dependency in Go example
9e54e7b6 [Kouhei Sutou] [GLib] Update dependency in Go example
ec2ddaca [Kouhei Sutou] [GLib] Add GInputStream based arrow::io::RandomAccessFile
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e438e151
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e438e151
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e438e151
Branch: refs/heads/master
Commit: e438e15172b94b5bad80ba39d838195b4748d7c3
Parents: dc4216f
Author: Kouhei Sutou <ko...@clear-code.com>
Authored: Sat Jul 15 23:46:31 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 15 23:46:31 2017 -0400
----------------------------------------------------------------------
c_glib/arrow-glib/Makefile.am | 7 +-
c_glib/arrow-glib/input-stream.cpp | 207 +++++++++++++++++++++++++++
c_glib/arrow-glib/input-stream.h | 52 +++++++
c_glib/configure.ac | 2 +-
c_glib/example/go/arrow-1.0/arrow.go.in | 1 +
c_glib/test/run-test.rb | 1 +
c_glib/test/test-gio-input-stream.rb | 53 +++++++
ci/travis_before_script_c_glib.sh | 6 +
8 files changed, 326 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/Makefile.am
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/Makefile.am b/c_glib/arrow-glib/Makefile.am
index f0bb601..f33a8d1 100644
--- a/c_glib/arrow-glib/Makefile.am
+++ b/c_glib/arrow-glib/Makefile.am
@@ -194,9 +194,12 @@ INTROSPECTION_COMPILER_ARGS =
if HAVE_INTROSPECTION
Arrow-1.0.gir: libarrow-glib.la
Arrow_1_0_gir_PACKAGES = \
- gobject-2.0
+ gobject-2.0 \
+ gio-2.0
Arrow_1_0_gir_EXPORT_PACKAGES = arrow
-Arrow_1_0_gir_INCLUDES = GObject-2.0
+Arrow_1_0_gir_INCLUDES = \
+ GObject-2.0 \
+ Gio-2.0
Arrow_1_0_gir_CFLAGS = \
$(AM_CPPFLAGS)
Arrow_1_0_gir_LIBS = libarrow-glib.la
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/input-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index d81e4a3..37ac2fa 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -32,6 +32,9 @@
#include <arrow-glib/readable.hpp>
#include <arrow-glib/tensor.hpp>
+#include <iostream>
+#include <sstream>
+
G_BEGIN_DECLS
/**
@@ -375,6 +378,210 @@ garrow_memory_mapped_input_stream_new(const gchar *path,
G_END_DECLS
+namespace garrow {
+ class GIOInputStream : public arrow::io::RandomAccessFile {
+ public:
+ GIOInputStream(GInputStream *input_stream) :
+ input_stream_(input_stream) {
+ g_object_ref(input_stream_);
+ }
+
+ ~GIOInputStream() {
+ g_object_unref(input_stream_);
+ }
+
+ GInputStream *get_input_stream() {
+ return input_stream_;
+ }
+
+ arrow::Status Close() override {
+ GError *error = NULL;
+ if (g_input_stream_close(input_stream_, NULL, &error)) {
+ return arrow::Status::OK();
+ } else {
+ return io_error_to_status(error, "[gio-input-stream][close]");
+ }
+ }
+
+ arrow::Status Tell(int64_t *position) override {
+ if (!G_IS_SEEKABLE(input_stream_)) {
+ std::string message("[gio-input-stream][tell] "
+ "not seekable input stream: <");
+ message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+ message += ">";
+ return arrow::Status::NotImplemented(message);
+ }
+
+ *position = g_seekable_tell(G_SEEKABLE(input_stream_));
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Read(int64_t n_bytes,
+ int64_t *n_read_bytes,
+ uint8_t *out) override {
+ GError *error = NULL;
+ *n_read_bytes = g_input_stream_read(input_stream_,
+ out,
+ n_bytes,
+ NULL,
+ &error);
+ if (*n_read_bytes == -1) {
+ return io_error_to_status(error, "[gio-input-stream][read]");
+ } else {
+ return arrow::Status::OK();
+ }
+ }
+
+ arrow::Status Read(int64_t n_bytes,
+ std::shared_ptr<arrow::Buffer> *out) override {
+ arrow::MemoryPool *pool = arrow::default_memory_pool();
+ std::shared_ptr<arrow::ResizableBuffer> buffer;
+ ARROW_RETURN_NOT_OK(AllocateResizableBuffer(pool, n_bytes, &buffer));
+
+ GError *error = NULL;
+ auto n_read_bytes = g_input_stream_read(input_stream_,
+ buffer->mutable_data(),
+ n_bytes,
+ NULL,
+ &error);
+ if (n_read_bytes == -1) {
+ return io_error_to_status(error, "[gio-input-stream][read][buffer]");
+ } else {
+ if (n_read_bytes < n_bytes) {
+ ARROW_RETURN_NOT_OK(buffer->Resize(n_read_bytes));
+ }
+ *out = buffer;
+ return arrow::Status::OK();
+ }
+ }
+
+ arrow::Status Seek(int64_t position) override {
+ if (!G_IS_SEEKABLE(input_stream_)) {
+ std::string message("[gio-input-stream][seek] "
+ "not seekable input stream: <");
+ message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+ message += ">";
+ return arrow::Status::NotImplemented(message);
+ }
+
+ GError *error = NULL;
+ if (g_seekable_seek(G_SEEKABLE(input_stream_),
+ position,
+ G_SEEK_SET,
+ NULL,
+ &error)) {
+ return arrow::Status::OK();
+ } else {
+ return io_error_to_status(error, "[gio-input-stream][seek]");
+ }
+ }
+
+ arrow::Status GetSize(int64_t *size) override {
+ if (!G_IS_SEEKABLE(input_stream_)) {
+ std::string message("[gio-input-stream][size] "
+ "not seekable input stream: <");
+ message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_));
+ message += ">";
+ return arrow::Status::NotImplemented(message);
+ }
+
+ auto current_position = g_seekable_tell(G_SEEKABLE(input_stream_));
+ GError *error = NULL;
+ if (!g_seekable_seek(G_SEEKABLE(input_stream_),
+ 0,
+ G_SEEK_END,
+ NULL,
+ &error)) {
+ return io_error_to_status(error, "[gio-input-stream][size][seek]");
+ }
+ *size = g_seekable_tell(G_SEEKABLE(input_stream_));
+ if (!g_seekable_seek(G_SEEKABLE(input_stream_),
+ current_position,
+ G_SEEK_SET,
+ NULL,
+ &error)) {
+ return io_error_to_status(error,
+ "[gio-input-stream][size][seek][restore]");
+ }
+ return arrow::Status::OK();
+ }
+
+ bool supports_zero_copy() const override {
+ return false;
+ }
+
+ private:
+ GInputStream *input_stream_;
+
+ arrow::Status io_error_to_status(GError *error, const char *context) {
+ std::stringstream message;
+ message << context << ": " << g_quark_to_string(error->domain);
+ message << "(" << error->code << "): ";
+ message << error->message;
+ g_error_free(error);
+ return arrow::Status::IOError(message.str());
+ }
+ };
+};
+
+G_BEGIN_DECLS
+
+G_DEFINE_TYPE(GArrowGIOInputStream, \
+ garrow_gio_input_stream, \
+ GARROW_TYPE_SEEKABLE_INPUT_STREAM);
+
+static void
+garrow_gio_input_stream_init(GArrowGIOInputStream *object)
+{
+}
+
+static void
+garrow_gio_input_stream_class_init(GArrowGIOInputStreamClass *klass)
+{
+}
+
+/**
+ * garrow_gio_input_stream_new:
+ * @gio_input_stream: The stream to be read.
+ *
+ * Returns: A newly created #GArrowGIOInputStream.
+ *
+ * Since: 0.5.0
+ */
+GArrowGIOInputStream *
+garrow_gio_input_stream_new(GInputStream *gio_input_stream)
+{
+ auto arrow_input_stream =
+ std::make_shared<garrow::GIOInputStream>(gio_input_stream);
+ auto object = g_object_new(GARROW_TYPE_GIO_INPUT_STREAM,
+ "input-stream", &arrow_input_stream,
+ NULL);
+ auto input_stream = GARROW_GIO_INPUT_STREAM(object);
+ return input_stream;
+}
+
+/**
+ * garrow_gio_input_stream_get_gio_input_stream:
+ * @input_stream: A #GArrowGIOInputStream.
+ *
+ * Returns: (transfer none): The wrapped #GInputStream.
+ *
+ * Since: 0.5.0
+ */
+GInputStream *
+garrow_gio_input_stream_get_gio_input_stream(GArrowGIOInputStream *input_stream)
+{
+ auto arrow_input_stream =
+ garrow_input_stream_get_raw(GARROW_INPUT_STREAM(input_stream));
+ auto arrow_gio_input_stream =
+ std::static_pointer_cast<garrow::GIOInputStream>(arrow_input_stream);
+ auto gio_input_stream = arrow_gio_input_stream->get_input_stream();
+ return gio_input_stream;
+}
+
+
+G_END_DECLS
+
GArrowInputStream *
garrow_input_stream_new_raw(std::shared_ptr<arrow::io::InputStream> *arrow_input_stream)
{
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/arrow-glib/input-stream.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h
index 8a4d362..120839d 100644
--- a/c_glib/arrow-glib/input-stream.h
+++ b/c_glib/arrow-glib/input-stream.h
@@ -19,6 +19,8 @@
#pragma once
+#include <gio/gio.h>
+
#include <arrow-glib/buffer.h>
#include <arrow-glib/tensor.h>
@@ -229,4 +231,54 @@ GType garrow_memory_mapped_input_stream_get_type(void) G_GNUC_CONST;
GArrowMemoryMappedInputStream *garrow_memory_mapped_input_stream_new(const gchar *path,
GError **error);
+
+#define GARROW_TYPE_GIO_INPUT_STREAM \
+ (garrow_gio_input_stream_get_type())
+#define GARROW_GIO_INPUT_STREAM(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj), \
+ GARROW_TYPE_GIO_INPUT_STREAM, \
+ GArrowGIOInputStream))
+#define GARROW_GIO_INPUT_STREAM_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass), \
+ GARROW_TYPE_GIO_INPUT_STREAM, \
+ GArrowGIOInputStreamClass))
+#define GARROW_IS_GIO_INPUT_STREAM(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj), \
+ GARROW_TYPE_GIO_INPUT_STREAM))
+#define GARROW_IS_GIO_INPUT_STREAM_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass), \
+ GARROW_TYPE_GIO_INPUT_STREAM))
+#define GARROW_GIO_INPUT_STREAM_GET_CLASS(obj) \
+ (G_TYPE_INSTANCE_GET_CLASS((obj), \
+ GARROW_TYPE_GIO_INPUT_STREAM, \
+ GArrowGIOInputStreamClass))
+
+typedef struct _GArrowGIOInputStream GArrowGIOInputStream;
+#ifndef __GTK_DOC_IGNORE__
+typedef struct _GArrowGIOInputStreamClass GArrowGIOInputStreamClass;
+#endif
+
+/**
+ * GArrowGIOInputStream:
+ *
+ * It's an input stream for `GInputStream`.
+ */
+struct _GArrowGIOInputStream
+{
+ /*< private >*/
+ GArrowSeekableInputStream parent_instance;
+};
+
+#ifndef __GTK_DOC_IGNORE__
+struct _GArrowGIOInputStreamClass
+{
+ GArrowSeekableInputStreamClass parent_class;
+};
+#endif
+
+GType garrow_gio_input_stream_get_type(void) G_GNUC_CONST;
+
+GArrowGIOInputStream *garrow_gio_input_stream_new(GInputStream *gio_input_stream);
+GInputStream *garrow_gio_input_stream_get_gio_input_stream(GArrowGIOInputStream *input_stream);
+
G_END_DECLS
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/configure.ac
----------------------------------------------------------------------
diff --git a/c_glib/configure.ac b/c_glib/configure.ac
index e010d96..d4e828b 100644
--- a/c_glib/configure.ac
+++ b/c_glib/configure.ac
@@ -56,7 +56,7 @@ fi
AC_SUBST(GARROW_CFLAGS)
AC_SUBST(GARROW_CXXFLAGS)
-AM_PATH_GLIB_2_0([2.32.4], [], [], [gobject])
+AM_PATH_GLIB_2_0([2.32.4], [], [], [gobject gio])
GOBJECT_INTROSPECTION_REQUIRE([1.32.1])
GTK_DOC_CHECK([1.18-2])
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/example/go/arrow-1.0/arrow.go.in
----------------------------------------------------------------------
diff --git a/c_glib/example/go/arrow-1.0/arrow.go.in b/c_glib/example/go/arrow-1.0/arrow.go.in
index bd124af..08ee13d 100644
--- a/c_glib/example/go/arrow-1.0/arrow.go.in
+++ b/c_glib/example/go/arrow-1.0/arrow.go.in
@@ -31,6 +31,7 @@ import "unsafe"
import (
"gir/glib-2.0"
"gir/gobject-2.0"
+ "gir/gio-2.0"
)
[<.go_utils>]
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/test/run-test.rb
----------------------------------------------------------------------
diff --git a/c_glib/test/run-test.rb b/c_glib/test/run-test.rb
index 43bb220..75ff34f 100755
--- a/c_glib/test/run-test.rb
+++ b/c_glib/test/run-test.rb
@@ -31,6 +31,7 @@ ENV["GI_TYPELIB_PATH"] = [
require "gi"
+Gio = GI.load("Gio")
Arrow = GI.load("Arrow")
module Arrow
class Buffer
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/c_glib/test/test-gio-input-stream.rb
----------------------------------------------------------------------
diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb
new file mode 100644
index 0000000..baa978e
--- /dev/null
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class TestGIOInputStream < Test::Unit::TestCase
+ def test_reader_backend
+ tempfile = Tempfile.open("arrow-gio-input-stream")
+ output = Arrow::FileOutputStream.new(tempfile.path, false)
+ begin
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+ file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
+ begin
+ record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ file_writer.write_record_batch(record_batch)
+ ensure
+ file_writer.close
+ end
+ ensure
+ output.close
+ end
+
+ file = Gio::File.new_for_path(tempfile.path)
+ input_stream = file.read
+ input = Arrow::GIOInputStream.new(input_stream)
+ begin
+ file_reader = Arrow::RecordBatchFileReader.new(input)
+ assert_equal(["enabled"],
+ file_reader.schema.fields.collect(&:name))
+ ensure
+ input.close
+ end
+ end
+
+ def test_getter
+ input_stream = Gio::MemoryInputStream.new("Hello")
+ input = Arrow::GIOInputStream.new(input_stream)
+ assert_equal(input_stream, input.gio_input_stream)
+ end
+end
http://git-wip-us.apache.org/repos/asf/arrow/blob/e438e151/ci/travis_before_script_c_glib.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_c_glib.sh b/ci/travis_before_script_c_glib.sh
index 6387f4d..6547ea4 100755
--- a/ci/travis_before_script_c_glib.sh
+++ b/ci/travis_before_script_c_glib.sh
@@ -44,6 +44,12 @@ luarocks install lgi
go get github.com/linuxdeepin/go-gir-generator || :
pushd $GOPATH/src/github.com/linuxdeepin/go-gir-generator
+rm lib.in/gio-2.0/gdk_workaround.go
+mv lib.in/gio-2.0/config.json{,.orig}
+sed \
+ -e 's/\("Settings",\)/\/\/ \1/g' \
+ -e 's/\("SettingsBackend",\)/\/\/ \1/g' \
+ lib.in/gio-2.0/config.json.orig > lib.in/gio-2.0/config.json
mv Makefile{,.orig}
sed -e 's/ gudev-1.0//' Makefile.orig > Makefile
mkdir -p out/src/gir/gudev-1.0