You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/11/22 22:31:55 UTC
[arrow] branch master updated: ARROW-3856: [Ruby] Support
compressed CSV save/load
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 80238f2 ARROW-3856: [Ruby] Support compressed CSV save/load
80238f2 is described below
commit 80238f2ef54916776282cc71cb567dea72476224
Author: Kouhei Sutou <ko...@clear-code.com>
AuthorDate: Thu Nov 22 23:31:45 2018 +0100
ARROW-3856: [Ruby] Support compressed CSV save/load
Author: Kouhei Sutou <ko...@clear-code.com>
Closes #3015 from kou/ruby-csv-compressed and squashes the following commits:
b3099ca9 <Kouhei Sutou> Support compressed CSV save/load
---
.../lib/arrow/compression-type.rb} | 27 ++++----
ruby/red-arrow/lib/arrow/csv-loader.rb | 24 +++++--
ruby/red-arrow/lib/arrow/loader.rb | 8 ++-
.../lib/arrow/path-extension.rb} | 37 +++++++----
ruby/red-arrow/lib/arrow/table-loader.rb | 60 ++++++++++-------
ruby/red-arrow/lib/arrow/table-saver.rb | 77 +++++++++++++++-------
.../{test/helper.rb => lib/arrow/writable.rb} | 15 ++---
ruby/red-arrow/test/helper.rb | 1 +
ruby/red-arrow/test/test-table.rb | 34 +++++++++-
.../lib/parquet/arrow-table-loadable.rb | 4 +-
.../red-parquet/lib/parquet/arrow-table-savable.rb | 4 +-
11 files changed, 196 insertions(+), 95 deletions(-)
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-arrow/lib/arrow/compression-type.rb
similarity index 71%
copy from ruby/red-parquet/lib/parquet/arrow-table-savable.rb
copy to ruby/red-arrow/lib/arrow/compression-type.rb
index 56cb3f4..b913e48 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-arrow/lib/arrow/compression-type.rb
@@ -15,20 +15,23 @@
# specific language governing permissions and limitations
# under the License.
-module Parquet
- module ArrowTableSavable
- private
- def save_as_parquet(path)
- chunk_size = @options[:chunk_size] || 1024 # TODO
- Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
- writer.write_table(@table, chunk_size)
+module Arrow
+ class CompressionType
+ EXTENSIONS = {}
+ values.each do |value|
+ case value
+ when UNCOMPRESSED
+ when GZIP
+ EXTENSIONS["gz"] = value
+ else
+ EXTENSIONS[value.nick] = value
end
end
- end
-end
-module Arrow
- class TableSaver
- include Parquet::ArrowTableSavable
+ class << self
+ def resolve_extension(extension)
+ EXTENSIONS[extension.to_s]
+ end
+ end
end
end
diff --git a/ruby/red-arrow/lib/arrow/csv-loader.rb b/ruby/red-arrow/lib/arrow/csv-loader.rb
index 3aa85bf..bb1f419 100644
--- a/ruby/red-arrow/lib/arrow/csv-loader.rb
+++ b/ruby/red-arrow/lib/arrow/csv-loader.rb
@@ -30,6 +30,7 @@ module Arrow
def initialize(path_or_data, **options)
@path_or_data = path_or_data
@options = options
+ @compression = @options.delete(:compression)
end
def load
@@ -115,12 +116,25 @@ module Arrow
options
end
+ def open_input(raw_input)
+ if @compression
+ codec = Codec.new(@compression)
+ CompressedInputStream.open(codec, raw_input) do |input|
+ yield(input)
+ end
+ else
+ yield(raw_input)
+ end
+ end
+
def load_from_path(path)
options = reader_options
if options
begin
- MemoryMappedInputStream.open(path.to_s) do |input|
- return CSVReader.new(input, options).read
+ MemoryMappedInputStream.open(path.to_s) do |raw_input|
+ open_input(raw_input) do |input|
+ return CSVReader.new(input, options).read
+ end
end
rescue Arrow::Error::Invalid
end
@@ -136,8 +150,10 @@ module Arrow
options = reader_options
if options
begin
- BufferInputStream.open(Buffer.new(data)) do |input|
- return CSVReader.new(input, options).read
+ BufferInputStream.open(Buffer.new(data)) do |raw_input|
+ open_input(raw_input) do |input|
+ return CSVReader.new(input, options).read
+ end
end
rescue Arrow::Error::Invalid
end
diff --git a/ruby/red-arrow/lib/arrow/loader.rb b/ruby/red-arrow/lib/arrow/loader.rb
index e147113..15dd025 100644
--- a/ruby/red-arrow/lib/arrow/loader.rb
+++ b/ruby/red-arrow/lib/arrow/loader.rb
@@ -35,6 +35,7 @@ module Arrow
require "arrow/array-builder"
require "arrow/chunked-array"
require "arrow/column"
+ require "arrow/compression-type"
require "arrow/csv-loader"
require "arrow/csv-read-options"
require "arrow/data-type"
@@ -43,8 +44,11 @@ module Arrow
require "arrow/date64-array"
require "arrow/date64-array-builder"
require "arrow/field"
+ require "arrow/path-extension"
require "arrow/record"
require "arrow/record-batch"
+ require "arrow/record-batch-file-reader"
+ require "arrow/record-batch-stream-reader"
require "arrow/rolling-window"
require "arrow/schema"
require "arrow/slicer"
@@ -58,9 +62,7 @@ module Arrow
require "arrow/tensor"
require "arrow/timestamp-array"
require "arrow/timestamp-array-builder"
-
- require "arrow/record-batch-file-reader"
- require "arrow/record-batch-stream-reader"
+ require "arrow/writable"
end
def load_object_info(info)
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-arrow/lib/arrow/path-extension.rb
similarity index 59%
copy from ruby/red-parquet/lib/parquet/arrow-table-savable.rb
copy to ruby/red-arrow/lib/arrow/path-extension.rb
index 56cb3f4..7d32672 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-arrow/lib/arrow/path-extension.rb
@@ -15,20 +15,31 @@
# specific language governing permissions and limitations
# under the License.
-module Parquet
- module ArrowTableSavable
- private
- def save_as_parquet(path)
- chunk_size = @options[:chunk_size] || 1024 # TODO
- Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
- writer.write_table(@table, chunk_size)
- end
+module Arrow
+ class PathExtension
+ def initialize(path)
+ @path = path
end
- end
-end
-module Arrow
- class TableSaver
- include Parquet::ArrowTableSavable
+ def extract
+ basename = ::File.basename(@path)
+ components = basename.split(".")
+ return {} if components.size == 1
+
+ extension = components.last.downcase
+ if components.size > 2
+ compression = CompressionType.resolve_extension(extension)
+ if compression
+ {
+ format: components[-2].downcase,
+ compression: compression,
+ }
+ else
+ {format: extension}
+ end
+ else
+ {format: extension}
+ end
+ end
end
end
diff --git a/ruby/red-arrow/lib/arrow/table-loader.rb b/ruby/red-arrow/lib/arrow/table-loader.rb
index db9a1fb..a6ce9a1 100644
--- a/ruby/red-arrow/lib/arrow/table-loader.rb
+++ b/ruby/red-arrow/lib/arrow/table-loader.rb
@@ -24,15 +24,14 @@ module Arrow
end
def initialize(path, options={})
+ path = path.to_path if path.respond_to?(:to_path)
@path = path
@options = options
+ fill_options
end
def load
- path = @path
- path = path.to_path if path.respond_to?(:to_path)
- format = @options[:format] || guess_format(path) || :arrow
-
+ format = @options[:format]
custom_load_method = "load_as_#{format}"
unless respond_to?(custom_load_method, true)
available_formats = []
@@ -47,17 +46,32 @@ module Arrow
message << "]: #{format.inspect}"
raise ArgumentError, message
end
- __send__(custom_load_method, path)
+ if method(custom_load_method).arity.zero?
+ __send__(custom_load_method)
+ else
+ # For backward compatibility.
+ __send__(custom_load_method, @path)
+ end
end
private
- def guess_format(path)
- extension = ::File.extname(path).gsub(/\A\./, "").downcase
- return nil if extension.empty?
-
- return extension if respond_to?("load_as_#{extension}", true)
+ def fill_options
+ if @options[:format] and @options.key?(:compression)
+ return
+ end
- nil
+ extension = PathExtension.new(@path)
+ info = extension.extract
+ format = info[:format]
+ @options = @options.dup
+ if respond_to?("load_as_#{format}", true)
+ @options[:format] ||= format.to_sym
+ else
+ @options[:format] ||= :arrow
+ end
+ unless @options.key?(:compression)
+ @options[:compression] = info[:compression]
+ end
end
def load_raw(input, reader)
@@ -77,7 +91,7 @@ module Arrow
table
end
- def load_as_arrow(path)
+ def load_as_arrow
input = nil
reader = nil
error = nil
@@ -86,7 +100,7 @@ module Arrow
RecordBatchStreamReader,
]
reader_class_candidates.each do |reader_class_candidate|
- input = MemoryMappedInputStream.new(path)
+ input = MemoryMappedInputStream.new(@path)
begin
reader = reader_class_candidate.new(input)
rescue Arrow::Error
@@ -99,21 +113,21 @@ module Arrow
load_raw(input, reader)
end
- def load_as_batch(path)
- input = MemoryMappedInputStream.new(path)
+ def load_as_batch
+ input = MemoryMappedInputStream.new(@path)
reader = RecordBatchFileReader.new(input)
load_raw(input, reader)
end
- def load_as_stream(path)
- input = MemoryMappedInputStream.new(path)
+ def load_as_stream
+ input = MemoryMappedInputStream.new(@path)
reader = RecordBatchStreamReader.new(input)
load_raw(input, reader)
end
if Arrow.const_defined?(:ORCFileReader)
- def load_as_orc(path)
- input = MemoryMappedInputStream.new(path)
+ def load_as_orc
+ input = MemoryMappedInputStream.new(@path)
reader = ORCFileReader.new(input)
field_indexes = @options[:field_indexes]
reader.set_field_indexes(field_indexes) if field_indexes
@@ -123,14 +137,14 @@ module Arrow
end
end
- def load_as_csv(path)
+ def load_as_csv
options = @options.dup
options.delete(:format)
- CSVLoader.load(Pathname.new(path), options)
+ CSVLoader.load(Pathname.new(@path), options)
end
- def load_as_feather(path)
- input = MemoryMappedInputStream.new(path)
+ def load_as_feather
+ input = MemoryMappedInputStream.new(@path)
reader = FeatherFileReader.new(input)
table = reader.read
table.instance_variable_set(:@input, input)
diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb b/ruby/red-arrow/lib/arrow/table-saver.rb
index bc315a3..99e6e49 100644
--- a/ruby/red-arrow/lib/arrow/table-saver.rb
+++ b/ruby/red-arrow/lib/arrow/table-saver.rb
@@ -25,15 +25,14 @@ module Arrow
def initialize(table, path, options={})
@table = table
+ path = path.to_path if path.respond_to?(:to_path)
@path = path
@options = options
+ fill_options
end
def save
- path = @path
- path = path.to_path if path.respond_to?(:to_path)
- format = @options[:format] || guess_format(path) || :arrow
-
+ format = @options[:format]
custom_save_method = "save_as_#{format}"
unless respond_to?(custom_save_method, true)
available_formats = []
@@ -48,41 +47,73 @@ module Arrow
message << "]: #{format.inspect}"
raise ArgumentError, message
end
- __send__(custom_save_method, path)
+ if method(custom_save_method).arity.zero?
+ __send__(custom_save_method)
+ else
+ # For backward compatibility.
+ __send__(custom_save_method, @path)
+ end
end
private
- def guess_format(path)
- extension = ::File.extname(path).gsub(/\A\./, "").downcase
- return nil if extension.empty?
-
- return extension if respond_to?("save_as_#{extension}", true)
+ def fill_options
+ if @options[:format] and @options.key?(:compression)
+ return
+ end
- nil
+ extension = PathExtension.new(@path)
+ info = extension.extract
+ format = info[:format]
+ @options = @options.dup
+ if respond_to?("save_as_#{format}", true)
+ @options[:format] ||= format.to_sym
+ else
+ @options[:format] ||= :arrow
+ end
+ unless @options.key?(:compression)
+ @options[:compression] = info[:compression]
+ end
end
- def save_raw(writer_class, path)
- FileOutputStream.open(path, false) do |output|
+ def save_raw(writer_class)
+ FileOutputStream.open(@path, false) do |output|
writer_class.open(output, @table.schema) do |writer|
writer.write_table(@table)
end
end
end
- def save_as_arrow(path)
- save_as_batch(path)
+ def save_as_arrow
+ save_as_batch
+ end
+
+ def save_as_batch
+ save_raw(RecordBatchFileWriter)
end
- def save_as_batch(path)
- save_raw(RecordBatchFileWriter, path)
+ def save_as_stream
+ save_raw(RecordBatchStreamWriter)
end
- def save_as_stream(path)
- save_raw(RecordBatchStreamWriter, path)
+ def open_output
+ compression = @options[:compression]
+ if compression
+ codec = Codec.new(compression)
+ FileOutputStream.open(@path, false) do |raw_output|
+ CompressedOutputStream.open(codec, raw_output) do |output|
+ yield(output)
+ end
+ end
+ else
+ ::File.open(@path, "w") do |output|
+ yield(output)
+ end
+ end
end
- def save_as_csv(path)
- CSV.open(path, "w") do |csv|
+ def save_as_csv
+ open_output do |output|
+ csv = CSV.new(output)
names = @table.schema.fields.collect(&:name)
csv << names
@table.each_record(reuse_record: true) do |record|
@@ -93,8 +124,8 @@ module Arrow
end
end
- def save_as_feather(path)
- FileOutputStream.open(path, false) do |output|
+ def save_as_feather
+ FileOutputStream.open(@path, false) do |output|
FeatherFileWriter.open(output) do |writer|
writer.write(@table)
end
diff --git a/ruby/red-arrow/test/helper.rb b/ruby/red-arrow/lib/arrow/writable.rb
similarity index 84%
copy from ruby/red-arrow/test/helper.rb
copy to ruby/red-arrow/lib/arrow/writable.rb
index c51f8ba..02be9dd 100644
--- a/ruby/red-arrow/test/helper.rb
+++ b/ruby/red-arrow/lib/arrow/writable.rb
@@ -15,13 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-require_relative "../version"
-
-require "arrow"
-
-require "pathname"
-require "tempfile"
-
-require "test-unit"
-
-require_relative "helper/fixture"
+module Arrow
+ module Writable
+ alias_method :<<, :write
+ end
+end
diff --git a/ruby/red-arrow/test/helper.rb b/ruby/red-arrow/test/helper.rb
index c51f8ba..2aa868b 100644
--- a/ruby/red-arrow/test/helper.rb
+++ b/ruby/red-arrow/test/helper.rb
@@ -21,6 +21,7 @@ require "arrow"
require "pathname"
require "tempfile"
+require "zlib"
require "test-unit"
diff --git a/ruby/red-arrow/test/test-table.rb b/ruby/red-arrow/test/test-table.rb
index 3eaaf63..1576f77 100644
--- a/ruby/red-arrow/test/test-table.rb
+++ b/ruby/red-arrow/test/test-table.rb
@@ -423,20 +423,30 @@ class TableTest < Test::Unit::TestCase
:schema => @table.schema))
end
+ test("csv.gz") do
+ file = Tempfile.new(["red-arrow", ".csv.gz"])
+ @table.save(file.path)
+ assert_equal(@table,
+ Arrow::Table.load(file.path,
+ :format => :csv,
+ :compression => :gzip,
+ :schema => @table.schema))
+ end
+
sub_test_case("load: auto detect") do
- test(":batch") do
+ test("batch") do
file = Tempfile.new(["red-arrow", ".arrow"])
@table.save(file.path, :format => :batch)
assert_equal(@table, Arrow::Table.load(file.path))
end
- test(":stream") do
+ test("stream") do
file = Tempfile.new(["red-arrow", ".arrow"])
@table.save(file.path, :format => :stream)
assert_equal(@table, Arrow::Table.load(file.path))
end
- test(":csv") do
+ test("csv") do
path = fixture_path("with-header.csv")
assert_equal(<<-TABLE, Arrow::Table.load(path, skip_lines: /^#/).to_s)
name score
@@ -445,6 +455,24 @@ class TableTest < Test::Unit::TestCase
2 chris -1
TABLE
end
+
+ test("csv.gz") do
+ file = Tempfile.new(["red-arrow", ".csv.gz"])
+ Zlib::GzipWriter.wrap(file) do |gz|
+ gz.write(<<-CSV)
+name,score
+alice,10
+bob,29
+chris,-1
+ CSV
+ end
+ assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
+ name score
+0 alice 10
+1 bob 29
+2 chris -1
+ TABLE
+ end
end
end
end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
index 56585b7..4df527b 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
@@ -18,8 +18,8 @@
module Parquet
module ArrowTableLoadable
private
- def load_as_parquet(path)
- reader = Parquet::ArrowFileReader.new(path)
+ def load_as_parquet
+ reader = Parquet::ArrowFileReader.new(@path)
reader.use_threads = (@options[:use_threads] != false)
reader.read_table
end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
index 56cb3f4..5d96d5f 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
@@ -18,9 +18,9 @@
module Parquet
module ArrowTableSavable
private
- def save_as_parquet(path)
+ def save_as_parquet
chunk_size = @options[:chunk_size] || 1024 # TODO
- Parquet::ArrowFileWriter.open(@table.schema, path) do |writer|
+ Parquet::ArrowFileWriter.open(@table.schema, @path) do |writer|
writer.write_table(@table, chunk_size)
end
end