You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/11/22 22:31:55 UTC

[arrow] branch master updated: ARROW-3856: [Ruby] Support compressed CSV save/load

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 80238f2  ARROW-3856: [Ruby] Support compressed CSV save/load
80238f2 is described below

commit 80238f2ef54916776282cc71cb567dea72476224
Author: Kouhei Sutou <ko...@clear-code.com>
AuthorDate: Thu Nov 22 23:31:45 2018 +0100

    ARROW-3856: [Ruby] Support compressed CSV save/load
    
    Author: Kouhei Sutou <ko...@clear-code.com>
    
    Closes #3015 from kou/ruby-csv-compressed and squashes the following commits:
    
    b3099ca9 <Kouhei Sutou>  Support compressed CSV save/load
---
 .../lib/arrow/compression-type.rb}                 | 27 ++++----
 ruby/red-arrow/lib/arrow/csv-loader.rb             | 24 +++++--
 ruby/red-arrow/lib/arrow/loader.rb                 |  8 ++-
 .../lib/arrow/path-extension.rb}                   | 37 +++++++----
 ruby/red-arrow/lib/arrow/table-loader.rb           | 60 ++++++++++-------
 ruby/red-arrow/lib/arrow/table-saver.rb            | 77 +++++++++++++++-------
 .../{test/helper.rb => lib/arrow/writable.rb}      | 15 ++---
 ruby/red-arrow/test/helper.rb                      |  1 +
 ruby/red-arrow/test/test-table.rb                  | 34 +++++++++-
 .../lib/parquet/arrow-table-loadable.rb            |  4 +-
 .../red-parquet/lib/parquet/arrow-table-savable.rb |  4 +-
 11 files changed, 196 insertions(+), 95 deletions(-)

diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-arrow/lib/arrow/compression-type.rb
similarity index 71%
copy from ruby/red-parquet/lib/parquet/arrow-table-savable.rb
copy to ruby/red-arrow/lib/arrow/compression-type.rb
index 56cb3f4..b913e48 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-arrow/lib/arrow/compression-type.rb
@@ -15,20 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-module Parquet
-  module ArrowTableSavable
-    private
-    def save_as_parquet(path)
-      chunk_size = @options[:chunk_size] || 1024 # TODO
-      Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
-        writer.write_table(@table, chunk_size)
+module Arrow
+  class CompressionType
+    EXTENSIONS = {}
+    values.each do |value|
+      case value
+      when UNCOMPRESSED
+      when GZIP
+        EXTENSIONS["gz"] = value
+      else
+        EXTENSIONS[value.nick] = value
       end
     end
-  end
-end
 
-module Arrow
-  class TableSaver
-    include Parquet::ArrowTableSavable
+    class << self
+      def resolve_extension(extension)
+        EXTENSIONS[extension.to_s]
+      end
+    end
   end
 end
diff --git a/ruby/red-arrow/lib/arrow/csv-loader.rb b/ruby/red-arrow/lib/arrow/csv-loader.rb
index 3aa85bf..bb1f419 100644
--- a/ruby/red-arrow/lib/arrow/csv-loader.rb
+++ b/ruby/red-arrow/lib/arrow/csv-loader.rb
@@ -30,6 +30,7 @@ module Arrow
     def initialize(path_or_data, **options)
       @path_or_data = path_or_data
       @options = options
+      @compression = @options.delete(:compression)
     end
 
     def load
@@ -115,12 +116,25 @@ module Arrow
       options
     end
 
+    def open_input(raw_input)
+      if @compression
+        codec = Codec.new(@compression)
+        CompressedInputStream.open(codec, raw_input) do |input|
+          yield(input)
+        end
+      else
+        yield(raw_input)
+      end
+    end
+
     def load_from_path(path)
       options = reader_options
       if options
         begin
-          MemoryMappedInputStream.open(path.to_s) do |input|
-            return CSVReader.new(input, options).read
+          MemoryMappedInputStream.open(path.to_s) do |raw_input|
+            open_input(raw_input) do |input|
+              return CSVReader.new(input, options).read
+            end
           end
         rescue Arrow::Error::Invalid
         end
@@ -136,8 +150,10 @@ module Arrow
       options = reader_options
       if options
         begin
-          BufferInputStream.open(Buffer.new(data)) do |input|
-            return CSVReader.new(input, options).read
+          BufferInputStream.open(Buffer.new(data)) do |raw_input|
+            open_input(raw_input) do |input|
+              return CSVReader.new(input, options).read
+            end
           end
         rescue Arrow::Error::Invalid
         end
diff --git a/ruby/red-arrow/lib/arrow/loader.rb b/ruby/red-arrow/lib/arrow/loader.rb
index e147113..15dd025 100644
--- a/ruby/red-arrow/lib/arrow/loader.rb
+++ b/ruby/red-arrow/lib/arrow/loader.rb
@@ -35,6 +35,7 @@ module Arrow
       require "arrow/array-builder"
       require "arrow/chunked-array"
       require "arrow/column"
+      require "arrow/compression-type"
       require "arrow/csv-loader"
       require "arrow/csv-read-options"
       require "arrow/data-type"
@@ -43,8 +44,11 @@ module Arrow
       require "arrow/date64-array"
       require "arrow/date64-array-builder"
       require "arrow/field"
+      require "arrow/path-extension"
       require "arrow/record"
       require "arrow/record-batch"
+      require "arrow/record-batch-file-reader"
+      require "arrow/record-batch-stream-reader"
       require "arrow/rolling-window"
       require "arrow/schema"
       require "arrow/slicer"
@@ -58,9 +62,7 @@ module Arrow
       require "arrow/tensor"
       require "arrow/timestamp-array"
       require "arrow/timestamp-array-builder"
-
-      require "arrow/record-batch-file-reader"
-      require "arrow/record-batch-stream-reader"
+      require "arrow/writable"
     end
 
     def load_object_info(info)
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-arrow/lib/arrow/path-extension.rb
similarity index 59%
copy from ruby/red-parquet/lib/parquet/arrow-table-savable.rb
copy to ruby/red-arrow/lib/arrow/path-extension.rb
index 56cb3f4..7d32672 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-arrow/lib/arrow/path-extension.rb
@@ -15,20 +15,31 @@
 # specific language governing permissions and limitations
 # under the License.
 
-module Parquet
-  module ArrowTableSavable
-    private
-    def save_as_parquet(path)
-      chunk_size = @options[:chunk_size] || 1024 # TODO
-      Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
-        writer.write_table(@table, chunk_size)
-      end
+module Arrow
+  class PathExtension
+    def initialize(path)
+      @path = path
     end
-  end
-end
 
-module Arrow
-  class TableSaver
-    include Parquet::ArrowTableSavable
+    def extract
+      basename = ::File.basename(@path)
+      components = basename.split(".")
+      return {} if components.size == 1
+
+      extension = components.last.downcase
+      if components.size > 2
+        compression = CompressionType.resolve_extension(extension)
+        if compression
+          {
+            format: components[-2].downcase,
+            compression: compression,
+          }
+        else
+          {format: extension}
+        end
+      else
+        {format: extension}
+      end
+    end
   end
 end
diff --git a/ruby/red-arrow/lib/arrow/table-loader.rb b/ruby/red-arrow/lib/arrow/table-loader.rb
index db9a1fb..a6ce9a1 100644
--- a/ruby/red-arrow/lib/arrow/table-loader.rb
+++ b/ruby/red-arrow/lib/arrow/table-loader.rb
@@ -24,15 +24,14 @@ module Arrow
     end
 
     def initialize(path, options={})
+      path = path.to_path if path.respond_to?(:to_path)
       @path = path
       @options = options
+      fill_options
     end
 
     def load
-      path = @path
-      path = path.to_path if path.respond_to?(:to_path)
-      format = @options[:format] || guess_format(path) || :arrow
-
+      format = @options[:format]
       custom_load_method = "load_as_#{format}"
       unless respond_to?(custom_load_method, true)
         available_formats = []
@@ -47,17 +46,32 @@ module Arrow
         message << "]: #{format.inspect}"
         raise ArgumentError, message
       end
-      __send__(custom_load_method, path)
+      if method(custom_load_method).arity.zero?
+        __send__(custom_load_method)
+      else
+        # For backward compatibility.
+        __send__(custom_load_method, @path)
+      end
     end
 
     private
-    def guess_format(path)
-      extension = ::File.extname(path).gsub(/\A\./, "").downcase
-      return nil if extension.empty?
-
-      return extension if respond_to?("load_as_#{extension}", true)
+    def fill_options
+      if @options[:format] and @options.key?(:compression)
+        return
+      end
 
-      nil
+      extension = PathExtension.new(@path)
+      info = extension.extract
+      format = info[:format]
+      @options = @options.dup
+      if respond_to?("load_as_#{format}", true)
+        @options[:format] ||= format.to_sym
+      else
+        @options[:format] ||= :arrow
+      end
+      unless @options.key?(:compression)
+        @options[:compression] = info[:compression]
+      end
     end
 
     def load_raw(input, reader)
@@ -77,7 +91,7 @@ module Arrow
       table
     end
 
-    def load_as_arrow(path)
+    def load_as_arrow
       input = nil
       reader = nil
       error = nil
@@ -86,7 +100,7 @@ module Arrow
         RecordBatchStreamReader,
       ]
       reader_class_candidates.each do |reader_class_candidate|
-        input = MemoryMappedInputStream.new(path)
+        input = MemoryMappedInputStream.new(@path)
         begin
           reader = reader_class_candidate.new(input)
         rescue Arrow::Error
@@ -99,21 +113,21 @@ module Arrow
       load_raw(input, reader)
     end
 
-    def load_as_batch(path)
-      input = MemoryMappedInputStream.new(path)
+    def load_as_batch
+      input = MemoryMappedInputStream.new(@path)
       reader = RecordBatchFileReader.new(input)
       load_raw(input, reader)
     end
 
-    def load_as_stream(path)
-      input = MemoryMappedInputStream.new(path)
+    def load_as_stream
+      input = MemoryMappedInputStream.new(@path)
       reader = RecordBatchStreamReader.new(input)
       load_raw(input, reader)
     end
 
     if Arrow.const_defined?(:ORCFileReader)
-      def load_as_orc(path)
-        input = MemoryMappedInputStream.new(path)
+      def load_as_orc
+        input = MemoryMappedInputStream.new(@path)
         reader = ORCFileReader.new(input)
         field_indexes = @options[:field_indexes]
         reader.set_field_indexes(field_indexes) if field_indexes
@@ -123,14 +137,14 @@ module Arrow
       end
     end
 
-    def load_as_csv(path)
+    def load_as_csv
       options = @options.dup
       options.delete(:format)
-      CSVLoader.load(Pathname.new(path), options)
+      CSVLoader.load(Pathname.new(@path), options)
     end
 
-    def load_as_feather(path)
-      input = MemoryMappedInputStream.new(path)
+    def load_as_feather
+      input = MemoryMappedInputStream.new(@path)
       reader = FeatherFileReader.new(input)
       table = reader.read
       table.instance_variable_set(:@input, input)
diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb b/ruby/red-arrow/lib/arrow/table-saver.rb
index bc315a3..99e6e49 100644
--- a/ruby/red-arrow/lib/arrow/table-saver.rb
+++ b/ruby/red-arrow/lib/arrow/table-saver.rb
@@ -25,15 +25,14 @@ module Arrow
 
     def initialize(table, path, options={})
       @table = table
+      path = path.to_path if path.respond_to?(:to_path)
       @path = path
       @options = options
+      fill_options
     end
 
     def save
-      path = @path
-      path = path.to_path if path.respond_to?(:to_path)
-      format = @options[:format] || guess_format(path) || :arrow
-
+      format = @options[:format]
       custom_save_method = "save_as_#{format}"
       unless respond_to?(custom_save_method, true)
         available_formats = []
@@ -48,41 +47,73 @@ module Arrow
         message << "]: #{format.inspect}"
         raise ArgumentError, message
       end
-      __send__(custom_save_method, path)
+      if method(custom_save_method).arity.zero?
+        __send__(custom_save_method)
+      else
+        # For backward compatibility.
+        __send__(custom_save_method, @path)
+      end
     end
 
     private
-    def guess_format(path)
-      extension = ::File.extname(path).gsub(/\A\./, "").downcase
-      return nil if extension.empty?
-
-      return extension if respond_to?("save_as_#{extension}", true)
+    def fill_options
+      if @options[:format] and @options.key?(:compression)
+        return
+      end
 
-      nil
+      extension = PathExtension.new(@path)
+      info = extension.extract
+      format = info[:format]
+      @options = @options.dup
+      if respond_to?("save_as_#{format}", true)
+        @options[:format] ||= format.to_sym
+      else
+        @options[:format] ||= :arrow
+      end
+      unless @options.key?(:compression)
+        @options[:compression] = info[:compression]
+      end
     end
 
-    def save_raw(writer_class, path)
-      FileOutputStream.open(path, false) do |output|
+    def save_raw(writer_class)
+      FileOutputStream.open(@path, false) do |output|
         writer_class.open(output, @table.schema) do |writer|
           writer.write_table(@table)
         end
       end
     end
 
-    def save_as_arrow(path)
-      save_as_batch(path)
+    def save_as_arrow
+      save_as_batch
+    end
+
+    def save_as_batch
+      save_raw(RecordBatchFileWriter)
     end
 
-    def save_as_batch(path)
-      save_raw(RecordBatchFileWriter, path)
+    def save_as_stream
+      save_raw(RecordBatchStreamWriter)
     end
 
-    def save_as_stream(path)
-      save_raw(RecordBatchStreamWriter, path)
+    def open_output
+      compression = @options[:compression]
+      if compression
+        codec = Codec.new(compression)
+        FileOutputStream.open(@path, false) do |raw_output|
+          CompressedOutputStream.open(codec, raw_output) do |output|
+            yield(output)
+          end
+        end
+      else
+        ::File.open(@path, "w") do |output|
+          yield(output)
+        end
+      end
     end
 
-    def save_as_csv(path)
-      CSV.open(path, "w") do |csv|
+    def save_as_csv
+      open_output do |output|
+        csv = CSV.new(output)
         names = @table.schema.fields.collect(&:name)
         csv << names
         @table.each_record(reuse_record: true) do |record|
@@ -93,8 +124,8 @@ module Arrow
       end
     end
 
-    def save_as_feather(path)
-      FileOutputStream.open(path, false) do |output|
+    def save_as_feather
+      FileOutputStream.open(@path, false) do |output|
         FeatherFileWriter.open(output) do |writer|
           writer.write(@table)
         end
diff --git a/ruby/red-arrow/test/helper.rb b/ruby/red-arrow/lib/arrow/writable.rb
similarity index 84%
copy from ruby/red-arrow/test/helper.rb
copy to ruby/red-arrow/lib/arrow/writable.rb
index c51f8ba..02be9dd 100644
--- a/ruby/red-arrow/test/helper.rb
+++ b/ruby/red-arrow/lib/arrow/writable.rb
@@ -15,13 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-require_relative "../version"
-
-require "arrow"
-
-require "pathname"
-require "tempfile"
-
-require "test-unit"
-
-require_relative "helper/fixture"
+module Arrow
+  module Writable
+    alias_method :<<, :write
+  end
+end
diff --git a/ruby/red-arrow/test/helper.rb b/ruby/red-arrow/test/helper.rb
index c51f8ba..2aa868b 100644
--- a/ruby/red-arrow/test/helper.rb
+++ b/ruby/red-arrow/test/helper.rb
@@ -21,6 +21,7 @@ require "arrow"
 
 require "pathname"
 require "tempfile"
+require "zlib"
 
 require "test-unit"
 
diff --git a/ruby/red-arrow/test/test-table.rb b/ruby/red-arrow/test/test-table.rb
index 3eaaf63..1576f77 100644
--- a/ruby/red-arrow/test/test-table.rb
+++ b/ruby/red-arrow/test/test-table.rb
@@ -423,20 +423,30 @@ class TableTest < Test::Unit::TestCase
                                        :schema => @table.schema))
       end
 
+      test("csv.gz") do
+        file = Tempfile.new(["red-arrow", ".csv.gz"])
+        @table.save(file.path)
+        assert_equal(@table,
+                     Arrow::Table.load(file.path,
+                                       :format => :csv,
+                                       :compression => :gzip,
+                                       :schema => @table.schema))
+      end
+
       sub_test_case("load: auto detect") do
-        test(":batch") do
+        test("batch") do
           file = Tempfile.new(["red-arrow", ".arrow"])
           @table.save(file.path, :format => :batch)
           assert_equal(@table, Arrow::Table.load(file.path))
         end
 
-        test(":stream") do
+        test("stream") do
           file = Tempfile.new(["red-arrow", ".arrow"])
           @table.save(file.path, :format => :stream)
           assert_equal(@table, Arrow::Table.load(file.path))
         end
 
-        test(":csv") do
+        test("csv") do
           path = fixture_path("with-header.csv")
           assert_equal(<<-TABLE, Arrow::Table.load(path, skip_lines: /^#/).to_s)
 	name	score
@@ -445,6 +455,24 @@ class TableTest < Test::Unit::TestCase
 2	chris	   -1
           TABLE
         end
+
+        test("csv.gz") do
+          file = Tempfile.new(["red-arrow", ".csv.gz"])
+          Zlib::GzipWriter.wrap(file) do |gz|
+            gz.write(<<-CSV)
+name,score
+alice,10
+bob,29
+chris,-1
+            CSV
+          end
+          assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
+	name	score
+0	alice	   10
+1	bob 	   29
+2	chris	   -1
+          TABLE
+        end
       end
     end
   end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
index 56585b7..4df527b 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
@@ -18,8 +18,8 @@
 module Parquet
   module ArrowTableLoadable
     private
-    def load_as_parquet(path)
-      reader = Parquet::ArrowFileReader.new(path)
+    def load_as_parquet
+      reader = Parquet::ArrowFileReader.new(@path)
       reader.use_threads = (@options[:use_threads] != false)
       reader.read_table
     end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
index 56cb3f4..5d96d5f 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
@@ -18,9 +18,9 @@
 module Parquet
   module ArrowTableSavable
     private
-    def save_as_parquet(path)
+    def save_as_parquet
       chunk_size = @options[:chunk_size] || 1024 # TODO
-      Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
+      Parquet::ArrowFileWriter.open(@table.schema, @path) do |writer|
         writer.write_table(@table, chunk_size)
       end
     end