You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/19 18:08:21 UTC

[arrow] branch master updated: GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bf8780d0ff GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)
bf8780d0ff is described below

commit bf8780d0ff794c50312d799a9e877430e99dcf8b
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 19 13:08:14 2023 -0500

    GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)
    
    
    * Closes: #33767
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 go/arrow/cdata/cdata_exports.go        |  2 +-
 go/arrow/cdata/cdata_fulltest.c        | 38 ++++++++++++++++++++++++----------
 go/arrow/cdata/cdata_test.go           | 10 +++++++++
 go/arrow/cdata/cdata_test_framework.go | 13 ++++++++++++
 4 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
index 555ab42adc..3aed7069a7 100644
--- a/go/arrow/cdata/cdata_exports.go
+++ b/go/arrow/cdata/cdata_exports.go
@@ -427,7 +427,7 @@ func (rr cRecordReader) next(out *CArrowArray) int {
 		ExportArrowRecordBatch(rr.rdr.Record(), out, nil)
 		return 0
 	}
-	releaseArr(out)
+	C.ArrowArrayMarkReleased(out)
 	return 0
 }
 
diff --git a/go/arrow/cdata/cdata_fulltest.c b/go/arrow/cdata/cdata_fulltest.c
index 06ea477ba0..837d347d53 100644
--- a/go/arrow/cdata/cdata_fulltest.c
+++ b/go/arrow/cdata/cdata_fulltest.c
@@ -65,7 +65,7 @@ static void release_int32_array(struct ArrowArray* array) {
     free(array->buffers);
     // mark released
     array->release = NULL;
-    test1_released = true;    
+    test1_released = true;
 }
 
 void export_int32_array(const int32_t* data, int64_t nitems, struct ArrowArray* array) {
@@ -91,7 +91,7 @@ void export_int32_array(const int32_t* data, int64_t nitems, struct ArrowArray*
 
 
 static void release_primitive(struct ArrowSchema* schema) {
-    free((void *)schema->format);    
+    free((void *)schema->format);
     schema->release = NULL;
 }
 
@@ -141,7 +141,7 @@ void test_primitive(struct ArrowSchema* schema, const char* fmt) {
         .children = NULL,
         .dictionary = NULL,
         // bookkeeping
-        .release = &release_primitive,    
+        .release = &release_primitive,
     };
 }
 
@@ -170,12 +170,12 @@ struct ArrowSchema** test_lists(const char** fmts, const char** names, const int
             schemas[i-1]->children = &schemas[i];
             schemas[i]->flags = nullflags[i-1];
         }
-    }    
+    }
     return schemas;
 }
 
 struct ArrowSchema** fill_structs(const char** fmts, const char** names, int64_t* flags, const int n) {
-    struct ArrowSchema** schemas = malloc(sizeof(struct ArrowSchema*)*n);        
+    struct ArrowSchema** schemas = malloc(sizeof(struct ArrowSchema*)*n);
     for (int i = 0; i < n; ++i) {
         schemas[i] = malloc(sizeof(struct ArrowSchema));
         *schemas[i] = (struct ArrowSchema) {
@@ -252,7 +252,7 @@ struct streamcounter {
 static int stream_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) {
     out->children = malloc(sizeof(struct ArrowSchema*)*2);
     out->n_children = 2;
-    
+
     out->children[0] = malloc(sizeof(struct ArrowSchema));
     *out->children[0] = (struct ArrowSchema) {
         .format = "i",
@@ -290,7 +290,7 @@ static void release_stream(struct ArrowArrayStream* st) {
 
 static void release_the_array(struct ArrowArray* out) {
     for (int i = 0; i < out->n_children; ++i) {
-        ArrowArrayRelease(out->children[i]);        
+        ArrowArrayRelease(out->children[i]);
     }
     free((void*)out->children);
     free(out->buffers);
@@ -322,17 +322,17 @@ void export_str_array(const char* data, const int32_t* offsets, int64_t nitems,
 
     out->buffers = (const void**)malloc(sizeof(void*) * out->n_buffers);
     assert(out->buffers != NULL);
-    out->buffers[0] = NULL;    
+    out->buffers[0] = NULL;
     out->buffers[1] = offsets;
     out->buffers[2] = data;
 }
 
-static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {    
+static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {
     struct streamcounter* cnter = (struct streamcounter*)(st->private_data);
     if (cnter->n == cnter->max) {
         ArrowArrayMarkReleased(out);
         return 0;
-    }    
+    }
 
     cnter->n++;
 
@@ -363,7 +363,7 @@ static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {
     offsets[1] = 3;
     offsets[2] = 6;
     offsets[3] = 9;
-    export_str_array(strdata, offsets, 3, out->children[1]);    
+    export_str_array(strdata, offsets, 3, out->children[1]);
 
     return 0;
 }
@@ -378,3 +378,19 @@ void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out)
     out->release = &release_stream;
     out->private_data = cnt;
 }
+
+int test_exported_stream(struct ArrowArrayStream* stream) {
+  while (1) {
+    struct ArrowArray array;
+    // Garbage - implementation should not try to call it, though!
+    array.release = (void*)0xDEADBEEF;
+    int rc = stream->get_next(stream, &array);
+    if (rc != 0) return rc;
+
+    if (array.release == NULL) {
+      stream->release(stream);
+      break;
+    }
+  }
+  return 0;
+}
diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go
index 84d212f250..f76e2fe1d4 100644
--- a/go/arrow/cdata/cdata_test.go
+++ b/go/arrow/cdata/cdata_test.go
@@ -747,3 +747,13 @@ func TestEmptyDictExport(t *testing.T) {
 	assert.Nil(t, out.dictionary.children)
 	assert.Nil(t, out.dictionary.dictionary)
 }
+
+func TestRecordReaderExport(t *testing.T) {
+	// Regression test for apache/arrow#33767
+	reclist := arrdata.Records["primitives"]
+	rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)
+
+	if err := exportedStreamTest(rdr); err != nil {
+		t.Fatalf("Failed to test exported stream: %#v", err)
+	}
+}
diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go
index 0f062ded0d..8d0f0a5a82 100644
--- a/go/arrow/cdata/cdata_test_framework.go
+++ b/go/arrow/cdata/cdata_test_framework.go
@@ -53,8 +53,10 @@ package cdata
 // struct ArrowSchema** test_struct(const char** fmts, const char** names, int64_t* flags, const int n);
 // struct ArrowSchema** test_map(const char** fmts, const char** names, int64_t* flags, const int n);
 // struct ArrowSchema** test_schema(const char** fmts, const char** names, int64_t* flags, const int n);
+// int test_exported_stream(struct ArrowArrayStream* stream);
 import "C"
 import (
+	"fmt"
 	"unsafe"
 
 	"github.com/apache/arrow/go/v11/arrow"
@@ -264,3 +266,14 @@ func arrayStreamTest() *CArrowArrayStream {
 	C.setup_array_stream_test(2, st)
 	return st
 }
+
+func exportedStreamTest(reader array.RecordReader) error {
+	out := C.get_test_stream()
+	ExportRecordReader(reader, out)
+	rc := C.test_exported_stream(out)
+	C.free(unsafe.Pointer(out))
+	if rc == 0 {
+		return nil
+	}
+	return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
+}