You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/23 14:48:07 UTC

[arrow] branch master updated: ARROW-1782: [Python] Add pyarrow.compress, decompress APIs

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1524ed7  ARROW-1782: [Python] Add pyarrow.compress, decompress APIs
1524ed7 is described below

commit 1524ed7fc6fe5aac4528bd05b0c5690fb0f625da
Author: Wes McKinney <we...@twosigma.com>
AuthorDate: Thu Nov 23 09:48:01 2017 -0500

    ARROW-1782: [Python] Add pyarrow.compress, decompress APIs
    
    This enables bytes, Buffer, or buffer-like objects to be compressed either to PyBytes or `pyarrow.Buffer`. Wanted some feedback on the API (argument names, etc.). The compression API in Arrow in general requires knowing the size of the decompressed data, but some compressors (like Snappy) are able to tell you how big the result will be based only on the input buffer
    
    Author: Wes McKinney <we...@twosigma.com>
    
    Closes #1345 from wesm/ARROW-1782 and squashes the following commits:
    
    f297103b [Wes McKinney] More assertions, nicer error message when decompressed size not passed
    7cd0fdb7 [Wes McKinney] Tests for compress/decompress passing
    38480411 [Wes McKinney] Draft implementation of pyarrow.decompress
    29b82dca [Wes McKinney] Docstring for pyarrow.decompress
    2eeaf9a8 [Wes McKinney] Expose ResizableBuffer as Python class, test
    6968be3f [Wes McKinney] Work on compress/decompress methods
---
 cpp/src/arrow/util/compression.h     |   2 +-
 python/doc/source/api.rst            |   4 +
 python/pyarrow/__init__.py           |  17 +--
 python/pyarrow/includes/libarrow.pxd |  43 +++++--
 python/pyarrow/io.pxi                | 209 +++++++++++++++++++++++++++++++++--
 python/pyarrow/lib.pxd               |   7 ++
 python/pyarrow/plasma.pyx            |   2 +-
 python/pyarrow/public-api.pxi        |   7 ++
 python/pyarrow/tests/test_io.py      |  36 ++++++
 9 files changed, 301 insertions(+), 26 deletions(-)

diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index ae187a7..de3837e 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -27,7 +27,7 @@
 namespace arrow {
 
 struct Compression {
-  enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, ZSTD, LZ4 };
+  enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO };
 };
 
 class ARROW_EXPORT Codec {
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index fb2a286..bb2a042 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -195,7 +195,11 @@ Input / Output and Shared Memory
    :toctree: generated/
 
    allocate_buffer
+   compress
+   decompress
+   frombuffer
    Buffer
+   ResizableBuffer
    BufferReader
    BufferOutputStream
    NativeFile
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index c4db36e..0456a65 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -71,18 +71,21 @@ from pyarrow.lib import (null, bool_,
 # ARROW-1683: Remove after 0.8.0?
 from pyarrow.lib import TimestampType
 
-from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
-                         FixedSizeBufferWriter,
-                         Buffer, BufferReader, BufferOutputStream,
-                         OSFile, MemoryMappedFile, memory_map,
-                         allocate_buffer, frombuffer,
-                         memory_map, create_memory_map,
-                         have_libhdfs, have_libhdfs3, MockOutputStream)
+# Buffers, allocation
+from pyarrow.lib import (Buffer, ResizableBuffer, compress, decompress,
+                         allocate_buffer, frombuffer)
 
 from pyarrow.lib import (MemoryPool, total_allocated_bytes,
                          set_memory_pool, default_memory_pool,
                          log_memory_allocations)
 
+from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
+                         FixedSizeBufferWriter,
+                         BufferReader, BufferOutputStream,
+                         OSFile, MemoryMappedFile, memory_map,
+                         create_memory_map, have_libhdfs, have_libhdfs3,
+                         MockOutputStream)
+
 from pyarrow.lib import (ChunkedArray, Column, RecordBatch, Table,
                          concat_tables)
 
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3246481..5d68607 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -169,26 +169,27 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
     cdef cppclass CBuffer" arrow::Buffer":
         CBuffer(const uint8_t* data, int64_t size)
-        uint8_t* data()
+        const uint8_t* data()
+        uint8_t* mutable_data()
         int64_t size()
         shared_ptr[CBuffer] parent()
         c_bool is_mutable() const
+        c_bool Equals(const CBuffer& other)
 
     cdef cppclass CMutableBuffer" arrow::MutableBuffer"(CBuffer):
         CMutableBuffer(const uint8_t* data, int64_t size)
-        uint8_t* mutable_data()
+
+    cdef cppclass CResizableBuffer" arrow::ResizableBuffer"(CMutableBuffer):
+        CStatus Resize(const int64_t new_size, c_bool shrink_to_fit)
+        CStatus Reserve(const int64_t new_size)
 
     CStatus AllocateBuffer(CMemoryPool* pool, const int64_t size,
                            shared_ptr[CBuffer]* out)
 
     CStatus AllocateResizableBuffer(CMemoryPool* pool, const int64_t size,
-                                    shared_ptr[ResizableBuffer]* out)
+                                    shared_ptr[CResizableBuffer]* out)
 
-    cdef cppclass ResizableBuffer(CBuffer):
-        CStatus Resize(int64_t nbytes)
-        CStatus Reserve(int64_t nbytes)
-
-    cdef cppclass PoolBuffer(ResizableBuffer):
+    cdef cppclass PoolBuffer(CResizableBuffer):
         PoolBuffer()
         PoolBuffer(CMemoryPool*)
 
@@ -635,7 +636,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
 
     cdef cppclass CBufferOutputStream \
             " arrow::io::BufferOutputStream"(OutputStream):
-        CBufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
+        CBufferOutputStream(const shared_ptr[CResizableBuffer]& buffer)
 
     cdef cppclass CMockOutputStream \
             " arrow::io::MockOutputStream"(OutputStream):
@@ -661,6 +662,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         MessageType_V1" arrow::ipc::MetadataVersion::V1"
         MessageType_V2" arrow::ipc::MetadataVersion::V2"
         MessageType_V3" arrow::ipc::MetadataVersion::V3"
+        MessageType_V4" arrow::ipc::MetadataVersion::V4"
 
     cdef cppclass CMessage" arrow::ipc::Message":
         CStatus Open(const shared_ptr[CBuffer]& metadata,
@@ -926,3 +928,26 @@ cdef extern from 'arrow/python/init.h':
 
 cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
     void set_numpy_nan(object o)
+
+
+cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
+    enum CompressionType" arrow::Compression::type":
+        CompressionType_UNCOMPRESSED" arrow::Compression::UNCOMPRESSED"
+        CompressionType_SNAPPY" arrow::Compression::SNAPPY"
+        CompressionType_GZIP" arrow::Compression::GZIP"
+        CompressionType_BROTLI" arrow::Compression::BROTLI"
+        CompressionType_ZSTD" arrow::Compression::ZSTD"
+        CompressionType_LZ4" arrow::Compression::LZ4"
+
+    cdef cppclass CCodec" arrow::Codec":
+        @staticmethod
+        CStatus Create(CompressionType codec, unique_ptr[CCodec]* out)
+
+        CStatus Decompress(int64_t input_len, const uint8_t* input,
+                           int64_t output_len, uint8_t* output_buffer)
+
+        CStatus Compress(int64_t input_len, const uint8_t* input,
+                         int64_t output_buffer_len, uint8_t* output_buffer,
+                         int64_t* output_length)
+
+        int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 495e31b..619ba36 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -600,6 +600,23 @@ cdef class Buffer:
         # TODO(wesm): buffer slicing
         raise NotImplementedError
 
+    def equals(self, Buffer other):
+        """
+        Determine if two buffers contain exactly the same data
+
+        Parameters
+        ----------
+        other : Buffer
+
+        Returns
+        -------
+        are_equal : True if buffer contents and size are equal
+        """
+        cdef c_bool result = False
+        with nogil:
+            result = self.buffer.get().Equals(deref(other.buffer.get()))
+        return result
+
     def to_pybytes(self):
         return cp.PyBytes_FromStringAndSize(
             <const char*>self.buffer.get().data(),
@@ -644,13 +661,37 @@ cdef class Buffer:
         return self.size
 
 
+cdef class ResizableBuffer(Buffer):
+
+    cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer):
+        self.init(<shared_ptr[CBuffer]> buffer)
+
+    def resize(self, int64_t new_size, shrink_to_fit=False):
+        """
+        Resize buffer to indicated size
+
+        Parameters
+        ----------
+        new_size : int64_t
+            New size of buffer (padding may be added internally)
+        shrink_to_fit : boolean, default False
+            If new_size is less than the current size, shrink internal
+            capacity, otherwise leave at current capacity
+        """
+        cdef c_bool c_shrink_to_fit = shrink_to_fit
+        with nogil:
+            check_status((<CResizableBuffer*> self.buffer.get())
+                         .Resize(new_size, c_shrink_to_fit))
+
+
 cdef shared_ptr[PoolBuffer] _allocate_buffer(CMemoryPool* pool):
     cdef shared_ptr[PoolBuffer] result
     result.reset(new PoolBuffer(pool))
     return result
 
 
-def allocate_buffer(int64_t size, MemoryPool pool=None):
+def allocate_buffer(int64_t size, MemoryPool memory_pool=None,
+                    resizable=False):
     """
     Allocate mutable fixed-size buffer
 
@@ -658,17 +699,27 @@ def allocate_buffer(int64_t size, MemoryPool pool=None):
     ----------
     size : int
         Number of bytes to allocate (plus internal padding)
-    pool : MemoryPool, optional
+    memory_pool : MemoryPool, optional
         Uses default memory pool if not provided
+    resizable : boolean, default False
+
+    Returns
+    -------
+    buffer : Buffer or ResizableBuffer
     """
     cdef:
         shared_ptr[CBuffer] buffer
-        CMemoryPool* cpool = maybe_unbox_memory_pool(pool)
+        shared_ptr[CResizableBuffer] rz_buffer
+        CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool)
 
-    with nogil:
-        check_status(AllocateBuffer(cpool, size, &buffer))
-
-    return pyarrow_wrap_buffer(buffer)
+    if resizable:
+        with nogil:
+            check_status(AllocateResizableBuffer(cpool, size, &rz_buffer))
+        return pyarrow_wrap_resizable_buffer(rz_buffer)
+    else:
+        with nogil:
+            check_status(AllocateBuffer(cpool, size, &buffer))
+        return pyarrow_wrap_buffer(buffer)
 
 
 cdef class BufferOutputStream(NativeFile):
@@ -679,7 +730,7 @@ cdef class BufferOutputStream(NativeFile):
     def __cinit__(self, MemoryPool memory_pool=None):
         self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
         self.wr_file.reset(new CBufferOutputStream(
-            <shared_ptr[ResizableBuffer]> self.buffer))
+            <shared_ptr[CResizableBuffer]> self.buffer))
         self.is_readable = 0
         self.is_writeable = 1
         self.is_open = True
@@ -783,3 +834,145 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     else:
         raise TypeError('Unable to read from object of type: {0}'
                         .format(type(source)))
+
+
+# ---------------------------------------------------------------------
+
+cdef CompressionType _get_compression_type(object name):
+    if name is None or name == 'uncompressed':
+        return CompressionType_UNCOMPRESSED
+    elif name == 'snappy':
+        return CompressionType_SNAPPY
+    elif name == 'gzip':
+        return CompressionType_GZIP
+    elif name == 'brotli':
+        return CompressionType_BROTLI
+    elif name == 'zstd':
+        return CompressionType_ZSTD
+    elif name == 'lz4':
+        return CompressionType_LZ4
+    else:
+        raise ValueError("Unrecognized compression type: {0}"
+                         .format(str(name)))
+
+
+def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
+    """
+    Compress pyarrow.Buffer or Python object supporting the buffer (memoryview)
+    protocol
+
+    Parameters
+    ----------
+    buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
+    codec : string, default 'lz4'
+        Compression codec.
+        Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'}
+    asbytes : boolean, default False
+        Return result as Python bytes object, otherwise Buffer
+    memory_pool : MemoryPool, default None
+        Memory pool to use for buffer allocations, if any
+
+    Returns
+    -------
+    compressed : pyarrow.Buffer or bytes (if asbytes=True)
+    """
+    cdef:
+        CompressionType c_codec = _get_compression_type(codec)
+        unique_ptr[CCodec] compressor
+        cdef CBuffer* c_buf
+        cdef PyObject* pyobj
+        cdef ResizableBuffer out_buf
+
+    with nogil:
+        check_status(CCodec.Create(c_codec, &compressor))
+
+    if not isinstance(buf, Buffer):
+        buf = frombuffer(buf)
+
+    c_buf = (<Buffer> buf).buffer.get()
+
+    cdef int64_t max_output_size = (compressor.get()
+                                    .MaxCompressedLen(c_buf.size(),
+                                                      c_buf.data()))
+    cdef uint8_t* output_buffer = NULL
+
+    if asbytes:
+        pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size)
+        output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj)
+    else:
+        out_buf = allocate_buffer(max_output_size, memory_pool=memory_pool,
+                                  resizable=True)
+        output_buffer = out_buf.buffer.get().mutable_data()
+
+    cdef int64_t output_length = 0
+    with nogil:
+        check_status(compressor.get()
+                     .Compress(c_buf.size(), c_buf.data(),
+                               max_output_size, output_buffer,
+                               &output_length))
+
+    if asbytes:
+        cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length)
+        return PyObject_to_object(pyobj)
+    else:
+        out_buf.resize(output_length)
+        return out_buf
+
+
+def decompress(object buf, decompressed_size=None, codec='lz4',
+               asbytes=False, memory_pool=None):
+    """
+    Decompress data from buffer-like object
+
+    Parameters
+    ----------
+    buf : pyarrow.Buffer, bytes, or memoryview-compatible object
+    decompressed_size : int64_t, default None
+        If not specified, will be computed if the codec is able to determine
+        the uncompressed buffer size
+    codec : string, default 'lz4'
+        Compression codec.
+        Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'}
+    asbytes : boolean, default False
+        Return result as Python bytes object, otherwise Buffer
+    memory_pool : MemoryPool, default None
+        Memory pool to use for buffer allocations, if any
+
+    Returns
+    -------
+    uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
+    """
+    cdef:
+        CompressionType c_codec = _get_compression_type(codec)
+        unique_ptr[CCodec] compressor
+        cdef CBuffer* c_buf
+        cdef Buffer out_buf
+
+    with nogil:
+        check_status(CCodec.Create(c_codec, &compressor))
+
+    if not isinstance(buf, Buffer):
+        buf = frombuffer(buf)
+
+    c_buf = (<Buffer> buf).buffer.get()
+
+    if decompressed_size is None:
+        raise ValueError("Must pass decompressed_size for {0} codec"
+                         .format(codec))
+
+    cdef int64_t output_size = decompressed_size
+    cdef uint8_t* output_buffer = NULL
+
+    if asbytes:
+        pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size)
+        output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf)
+    else:
+        out_buf = allocate_buffer(output_size, memory_pool=memory_pool)
+        output_buffer = out_buf.buffer.get().mutable_data()
+
+    with nogil:
+        check_status(compressor.get()
+                     .Decompress(c_buf.size(), c_buf.data(),
+                                 output_size, output_buffer))
+
+    return pybuf if asbytes else out_buf
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 5abb72b..90f749d 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -323,6 +323,11 @@ cdef class Buffer:
     cdef void init(self, const shared_ptr[CBuffer]& buffer)
 
 
+cdef class ResizableBuffer(Buffer):
+
+    cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer)
+
+
 cdef class NativeFile:
     cdef:
         shared_ptr[RandomAccessFile] rd_file
@@ -343,6 +348,8 @@ cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader)
 cdef get_writer(object source, shared_ptr[OutputStream]* writer)
 
 cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf)
+cdef public object pyarrow_wrap_resizable_buffer(
+    const shared_ptr[CResizableBuffer]& buf)
 cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type)
 cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field)
 cdef public object pyarrow_wrap_schema(const shared_ptr[CSchema]& type)
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index bc0e94e..f2e8653 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -30,7 +30,7 @@ import collections
 import pyarrow
 
 from pyarrow.lib cimport Buffer, NativeFile, check_status
-from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer,
+from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
 
 
diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi
index bf670c5..9776f2a 100644
--- a/python/pyarrow/public-api.pxi
+++ b/python/pyarrow/public-api.pxi
@@ -43,6 +43,13 @@ cdef public api object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf):
     return result
 
 
+cdef public api object pyarrow_wrap_resizable_buffer(
+    const shared_ptr[CResizableBuffer]& buf):
+    cdef ResizableBuffer result = ResizableBuffer()
+    result.init_rz(buf)
+    return result
+
+
 cdef public api bint pyarrow_is_data_type(object type_):
     return isinstance(type_, DataType)
 
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 98c465a..e60dd35 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -182,6 +182,42 @@ def test_allocate_buffer():
     assert buf.to_pybytes()[:5] == bit
 
 
+def test_allocate_buffer_resizable():
+    buf = pa.allocate_buffer(100, resizable=True)
+    assert isinstance(buf, pa.ResizableBuffer)
+
+    buf.resize(200)
+    assert buf.size == 200
+
+
+def test_compress_decompress():
+    INPUT_SIZE = 10000
+    test_data = (np.random.randint(0, 255, size=INPUT_SIZE)
+                 .astype(np.uint8)
+                 .tostring())
+    test_buf = pa.frombuffer(test_data)
+
+    codecs = ['lz4', 'snappy', 'gzip', 'zstd', 'brotli']
+    for codec in codecs:
+        compressed_buf = pa.compress(test_buf, codec=codec)
+        compressed_bytes = pa.compress(test_data, codec=codec, asbytes=True)
+
+        assert isinstance(compressed_bytes, bytes)
+
+        decompressed_buf = pa.decompress(compressed_buf, INPUT_SIZE,
+                                         codec=codec)
+        decompressed_bytes = pa.decompress(compressed_bytes, INPUT_SIZE,
+                                           codec=codec, asbytes=True)
+
+        assert isinstance(decompressed_bytes, bytes)
+
+        assert decompressed_buf.equals(test_buf)
+        assert decompressed_bytes == test_data
+
+        with pytest.raises(ValueError):
+            pa.decompress(compressed_bytes, codec=codec)
+
+
 def test_buffer_memoryview_is_immutable():
     val = b'some data'
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].