You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 23:50:28 UTC

[arrow] branch maint-10.0.x updated (1f9db90db6 -> 113ccb2b4b)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a change to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git


    from 1f9db90db6 MINOR: [R] Fix for dev purrr (#14581)
     new 60e1d81ed9 ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
     new 15d1df4240 ARROW-18322: [Python] Add PYARROW_WITH_FLIGHT to PyArrow C++ cmake (#14642)
     new 113ccb2b4b ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 go/arrow/internal/dictutils/dict.go |  10 ++-
 go/arrow/ipc/ipc.go                 |   8 ++
 go/arrow/ipc/ipc_test.go            | 156 +++++++++++++++++++++++++++++++++++-
 go/arrow/ipc/writer.go              |  22 ++---
 python/pyarrow/src/CMakeLists.txt   |   2 +-
 python/setup.py                     |   2 +
 6 files changed, 186 insertions(+), 14 deletions(-)


[arrow] 03/03: ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)

Posted by ko...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 113ccb2b4b458c8c83894b269521b34e16ff68a8
Author: Laurent Quérel <la...@gmail.com>
AuthorDate: Tue Nov 15 13:36:06 2022 -0800

    ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)
    
    Test not working because of a potential issue in dictionary management.
    
    Lead-authored-by: Laurent Querel <l....@f5.com>
    Co-authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/arrow/ipc/ipc.go      |  8 ++++++
 go/arrow/ipc/ipc_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---
 go/arrow/ipc/writer.go   | 22 +++++++++--------
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 71d810b136..44a19b8ae8 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -70,6 +70,7 @@ type config struct {
 	compressNP         int
 	ensureNativeEndian bool
 	noAutoSchema       bool
+	emitDictDeltas     bool
 }
 
 func newConfig(opts ...Option) *config {
@@ -160,6 +161,13 @@ func WithDelayReadSchema(v bool) Option {
 	}
 }
 
+// WithDictionaryDeltas specifies whether or not to emit dictionary deltas.
+func WithDictionaryDeltas(v bool) Option {
+	return func(cfg *config) {
+		cfg.emitDictDeltas = v
+	}
+}
+
 var (
 	_ arrio.Reader = (*Reader)(nil)
 	_ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index d6d0abf225..1bde7192a5 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -24,12 +24,13 @@ import (
 	"strconv"
 	"testing"
 
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
 	"github.com/apache/arrow/go/v10/arrow"
 	"github.com/apache/arrow/go/v10/arrow/array"
 	"github.com/apache/arrow/go/v10/arrow/ipc"
 	"github.com/apache/arrow/go/v10/arrow/memory"
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 )
 
 func TestArrow12072(t *testing.T) {
@@ -345,7 +346,7 @@ func TestDictionary(t *testing.T) {
 
 	// IPC writer and reader
 	var bufWriter bytes.Buffer
-	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false))
 	defer ipcWriter.Close()
 
 	bufReader := bytes.NewReader([]byte{})
@@ -391,6 +392,63 @@ func TestDictionary(t *testing.T) {
 	ipcReader.Release()
 }
 
+// ARROW-18326
+func TestDictionaryDeltas(t *testing.T) {
+	pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer pool.AssertSize(t, 0)
+
+	// A schema with a single dictionary field
+	schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+		IndexType: arrow.PrimitiveTypes.Uint16,
+		ValueType: arrow.BinaryTypes.String,
+		Ordered:   false,
+	}}}, nil)
+
+	// IPC writer and reader
+	var bufWriter bytes.Buffer
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true))
+	defer ipcWriter.Close()
+
+	bufReader := bytes.NewReader([]byte{})
+	var ipcReader *ipc.Reader
+
+	bldr := array.NewBuilder(pool, schema.Field(0).Type)
+	defer bldr.Release()
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+	arr := bldr.NewArray()
+	defer arr.Release()
+	// Create a first record with field = "value_0"
+	record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err := record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	require.JSONEq(t, string(expectedJson), string(json))
+
+	// Create a second record with field = "value_1"
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+	arr = bldr.NewArray()
+	defer arr.Release()
+	record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err = record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	// field = "value_0" but should be "value_1"
+	require.JSONEq(t, string(expectedJson), string(json))
+	require.NoError(t, ipcReader.Err())
+	ipcReader.Release()
+}
+
 // Encode and decode a record over a tuple of IPC writer and reader.
 // IPC writer and reader are the same from one call to another.
 func encodeDecodeIpcStream(t *testing.T,
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 9af88d9c2d..ef20e70815 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -99,11 +99,12 @@ type Writer struct {
 func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
 	cfg := newConfig(opts...)
 	return &Writer{
-		mem:        cfg.alloc,
-		pw:         pw,
-		schema:     cfg.schema,
-		codec:      cfg.codec,
-		compressNP: cfg.compressNP,
+		mem:            cfg.alloc,
+		pw:             pw,
+		schema:         cfg.schema,
+		codec:          cfg.codec,
+		compressNP:     cfg.compressNP,
+		emitDictDeltas: cfg.emitDictDeltas,
 	}
 }
 
@@ -111,11 +112,12 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
 func NewWriter(w io.Writer, opts ...Option) *Writer {
 	cfg := newConfig(opts...)
 	return &Writer{
-		w:      w,
-		mem:    cfg.alloc,
-		pw:     &swriter{w: w},
-		schema: cfg.schema,
-		codec:  cfg.codec,
+		w:              w,
+		mem:            cfg.alloc,
+		pw:             &swriter{w: w},
+		schema:         cfg.schema,
+		codec:          cfg.codec,
+		emitDictDeltas: cfg.emitDictDeltas,
 	}
 }
 


[arrow] 01/03: ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)

Posted by ko...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 60e1d81ed9c7ff409d96f1c658698df19ce79748
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Tue Nov 15 10:55:56 2022 -0500

    ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
    
    Fix dictionary replacement for IPC streams. Currently they incorrectly get concatenated together instead of replaced when not using deltas. This will properly replace dictionaries when encountering a non-delta dictionary message.
    
    Authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/arrow/internal/dictutils/dict.go | 10 +++-
 go/arrow/ipc/ipc_test.go            | 94 +++++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/go/arrow/internal/dictutils/dict.go b/go/arrow/internal/dictutils/dict.go
index 9dd17c6540..561923f626 100644
--- a/go/arrow/internal/dictutils/dict.go
+++ b/go/arrow/internal/dictutils/dict.go
@@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) {
 	memo.id2dict[id] = append(d, v)
 }
 
+// AddOrReplace puts the provided dictionary into the memo table. If it
+// already exists, then the new data will replace it. Otherwise it is added
+// to the memo table.
 func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool {
 	d, ok := memo.id2dict[id]
 	if ok {
-		d = append(d, v)
+		// replace the dictionary and release any existing ones
+		for _, dict := range d {
+			dict.Release()
+		}
+		d[0] = v
+		d = d[:1]
 	} else {
 		d = []arrow.ArrayData{v}
 	}
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2f0816c0d0..d6d0abf225 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) {
 		n++
 	}
 }
+
+// ARROW-18317
+func TestDictionary(t *testing.T) {
+	pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer pool.AssertSize(t, 0)
+
+	// A schema with a single dictionary field
+	schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+		IndexType: arrow.PrimitiveTypes.Uint16,
+		ValueType: arrow.BinaryTypes.String,
+		Ordered:   false,
+	}}}, nil)
+
+	// IPC writer and reader
+	var bufWriter bytes.Buffer
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+	defer ipcWriter.Close()
+
+	bufReader := bytes.NewReader([]byte{})
+	var ipcReader *ipc.Reader
+
+	bldr := array.NewBuilder(pool, schema.Field(0).Type)
+	defer bldr.Release()
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+	arr := bldr.NewArray()
+	defer arr.Release()
+	// Create a first record with field = "value_0"
+	record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err := record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	require.JSONEq(t, string(expectedJson), string(json))
+
+	// Create a second record with field = "value_1"
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+	arr = bldr.NewArray()
+	defer arr.Release()
+	record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+
+	// record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`))
+	// require.NoError(t, err)
+	defer record.Release()
+
+	expectedJson, err = record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	// field = "value_0" but should be "value_1"
+	require.JSONEq(t, string(expectedJson), string(json))
+	require.NoError(t, ipcReader.Err())
+	ipcReader.Release()
+}
+
+// Encode and decode a record over a tuple of IPC writer and reader.
+// IPC writer and reader are the same from one call to another.
+func encodeDecodeIpcStream(t *testing.T,
+	record arrow.Record,
+	bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
+	bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {
+
+	// Serialize the record via an ipc writer
+	if err := ipcWriter.Write(record); err != nil {
+		return nil, ipcReader, err
+	}
+	serializedRecord := bufWriter.Bytes()
+	bufWriter.Reset()
+
+	// Deserialize the record via an ipc reader
+	bufReader.Reset(serializedRecord)
+	if ipcReader == nil {
+		newIpcReader, err := ipc.NewReader(bufReader)
+		if err != nil {
+			return nil, newIpcReader, err
+		}
+		ipcReader = newIpcReader
+	}
+	ipcReader.Next()
+	record = ipcReader.Record()
+
+	// Return the decoded record as a json string
+	json, err := record.MarshalJSON()
+	if err != nil {
+		return nil, ipcReader, err
+	}
+	return json, ipcReader, nil
+}


[arrow] 02/03: ARROW-18322: [Python] Add PYARROW_WITH_FLIGHT to PyArrow C++ cmake (#14642)

Posted by ko...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 15d1df424012c81fcc8a1dccf6b5949aeeb4af1a
Author: Alenka Frim <Al...@users.noreply.github.com>
AuthorDate: Tue Nov 15 21:56:54 2022 +0100

    ARROW-18322: [Python] Add PYARROW_WITH_FLIGHT to PyArrow C++ cmake (#14642)
    
    This PR adds the use of `PYARROW_WITH_FLIGHT` environment variable when building PyArrow C++. With this change PyArrow will be able to build successfully even when Arrow C++ is installed without Arrow Flight ([example](https://github.com/OSGeo/gdal/blob/58ab718542a7b5ceb94e3f24a5acc0b18ac1efb2/docker/ubuntu-full/Dockerfile#L223-L231), JFrog artifactory) but `ARROW_FLIGHT` flag is not registered to be set to off.
    Note: `PYARROW_WITH_FLIGHT=0` is needed when building PyArrow.
    
    I tested the change on my regular build with Arrow Flight but with removed `libarrow_flight*` libs. Without this patch I get:
    ```
    CMake Error at /Users/alenkafrim/repos/arrow/dist/lib/cmake/ArrowFlight/ArrowFlightTargets.cmake:93 (message):
      The imported target "ArrowFlight::arrow_flight_shared" references the file
    
         "/Users/alenkafrim/repos/arrow/dist/lib/libarrow_flight.1100.0.0.dylib"
    
      but this file does not exist.  Possible reasons include:
    
      * The file was deleted, renamed, or moved to another location.
    
      * An install or uninstall procedure did not complete successfully.
    
      * The installation package was faulty and contained
    
         "/Users/alenkafrim/repos/arrow/dist/lib/cmake/ArrowFlight/ArrowFlightTargets.cmake"
    
      but not all the files it references.
    
    Call Stack (most recent call first):
      /Users/alenkafrim/repos/arrow/dist/lib/cmake/ArrowFlight/ArrowFlightConfig.cmake:56 (include)
      CMakeLists.txt:262 (find_package)
    ```
    
    With this patch PyArrow builds succesfully.
    
    Authored-by: Alenka Frim <fr...@gmail.com>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 python/pyarrow/src/CMakeLists.txt | 2 +-
 python/setup.py                   | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/python/pyarrow/src/CMakeLists.txt b/python/pyarrow/src/CMakeLists.txt
index 8fdfdfaf7e..01e2ed78d8 100644
--- a/python/pyarrow/src/CMakeLists.txt
+++ b/python/pyarrow/src/CMakeLists.txt
@@ -253,7 +253,7 @@ if(ARROW_BUILD_STATIC AND MSVC)
   target_compile_definitions(arrow_python_static PUBLIC ARROW_STATIC)
 endif()
 
-if(ARROW_FLIGHT AND ARROW_BUILD_SHARED)
+if(PYARROW_WITH_FLIGHT AND ARROW_FLIGHT AND ARROW_BUILD_SHARED)
    # Must link to shared libarrow_flight: we don't want to link more than one
    # copy of gRPC into the eventual Cython shared object, otherwise gRPC calls
    # fail with weird errors due to multiple copies of global static state (The
diff --git a/python/setup.py b/python/setup.py
index 6ca87c7940..b0587fbdf3 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -274,6 +274,8 @@ class build_ext(_build_ext):
                               'PYARROW_WITH_PARQUET_ENCRYPTION')
             append_cmake_bool(self.with_hdfs,
                               'PYARROW_WITH_HDFS')
+            append_cmake_bool(self.with_flight,
+                              'PYARROW_WITH_FLIGHT')
 
             # Windows
             if self.cmake_generator: