You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/19 18:08:21 UTC
[arrow] branch master updated: GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new bf8780d0ff GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)
bf8780d0ff is described below
commit bf8780d0ff794c50312d799a9e877430e99dcf8b
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 19 13:08:14 2023 -0500
GH-33767: [Go] Clear out parameter in ArrowArrayStream.get_next (#33768)
* Closes: #33767
Authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
go/arrow/cdata/cdata_exports.go | 2 +-
go/arrow/cdata/cdata_fulltest.c | 38 ++++++++++++++++++++++++----------
go/arrow/cdata/cdata_test.go | 10 +++++++++
go/arrow/cdata/cdata_test_framework.go | 13 ++++++++++++
4 files changed, 51 insertions(+), 12 deletions(-)
diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
index 555ab42adc..3aed7069a7 100644
--- a/go/arrow/cdata/cdata_exports.go
+++ b/go/arrow/cdata/cdata_exports.go
@@ -427,7 +427,7 @@ func (rr cRecordReader) next(out *CArrowArray) int {
ExportArrowRecordBatch(rr.rdr.Record(), out, nil)
return 0
}
- releaseArr(out)
+ C.ArrowArrayMarkReleased(out)
return 0
}
diff --git a/go/arrow/cdata/cdata_fulltest.c b/go/arrow/cdata/cdata_fulltest.c
index 06ea477ba0..837d347d53 100644
--- a/go/arrow/cdata/cdata_fulltest.c
+++ b/go/arrow/cdata/cdata_fulltest.c
@@ -65,7 +65,7 @@ static void release_int32_array(struct ArrowArray* array) {
free(array->buffers);
// mark released
array->release = NULL;
- test1_released = true;
+ test1_released = true;
}
void export_int32_array(const int32_t* data, int64_t nitems, struct ArrowArray* array) {
@@ -91,7 +91,7 @@ void export_int32_array(const int32_t* data, int64_t nitems, struct ArrowArray*
static void release_primitive(struct ArrowSchema* schema) {
- free((void *)schema->format);
+ free((void *)schema->format);
schema->release = NULL;
}
@@ -141,7 +141,7 @@ void test_primitive(struct ArrowSchema* schema, const char* fmt) {
.children = NULL,
.dictionary = NULL,
// bookkeeping
- .release = &release_primitive,
+ .release = &release_primitive,
};
}
@@ -170,12 +170,12 @@ struct ArrowSchema** test_lists(const char** fmts, const char** names, const int
schemas[i-1]->children = &schemas[i];
schemas[i]->flags = nullflags[i-1];
}
- }
+ }
return schemas;
}
struct ArrowSchema** fill_structs(const char** fmts, const char** names, int64_t* flags, const int n) {
- struct ArrowSchema** schemas = malloc(sizeof(struct ArrowSchema*)*n);
+ struct ArrowSchema** schemas = malloc(sizeof(struct ArrowSchema*)*n);
for (int i = 0; i < n; ++i) {
schemas[i] = malloc(sizeof(struct ArrowSchema));
*schemas[i] = (struct ArrowSchema) {
@@ -252,7 +252,7 @@ struct streamcounter {
static int stream_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) {
out->children = malloc(sizeof(struct ArrowSchema*)*2);
out->n_children = 2;
-
+
out->children[0] = malloc(sizeof(struct ArrowSchema));
*out->children[0] = (struct ArrowSchema) {
.format = "i",
@@ -290,7 +290,7 @@ static void release_stream(struct ArrowArrayStream* st) {
static void release_the_array(struct ArrowArray* out) {
for (int i = 0; i < out->n_children; ++i) {
- ArrowArrayRelease(out->children[i]);
+ ArrowArrayRelease(out->children[i]);
}
free((void*)out->children);
free(out->buffers);
@@ -322,17 +322,17 @@ void export_str_array(const char* data, const int32_t* offsets, int64_t nitems,
out->buffers = (const void**)malloc(sizeof(void*) * out->n_buffers);
assert(out->buffers != NULL);
- out->buffers[0] = NULL;
+ out->buffers[0] = NULL;
out->buffers[1] = offsets;
out->buffers[2] = data;
}
-static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {
+static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {
struct streamcounter* cnter = (struct streamcounter*)(st->private_data);
if (cnter->n == cnter->max) {
ArrowArrayMarkReleased(out);
return 0;
- }
+ }
cnter->n++;
@@ -363,7 +363,7 @@ static int next_record(struct ArrowArrayStream* st, struct ArrowArray* out) {
offsets[1] = 3;
offsets[2] = 6;
offsets[3] = 9;
- export_str_array(strdata, offsets, 3, out->children[1]);
+ export_str_array(strdata, offsets, 3, out->children[1]);
return 0;
}
@@ -378,3 +378,19 @@ void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out)
out->release = &release_stream;
out->private_data = cnt;
}
+
+int test_exported_stream(struct ArrowArrayStream* stream) {
+ while (1) {
+ struct ArrowArray array;
+ // Garbage - implementation should not try to call it, though!
+ array.release = (void*)0xDEADBEEF;
+ int rc = stream->get_next(stream, &array);
+ if (rc != 0) return rc;
+
+ if (array.release == NULL) {
+ stream->release(stream);
+ break;
+ }
+ }
+ return 0;
+}
diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go
index 84d212f250..f76e2fe1d4 100644
--- a/go/arrow/cdata/cdata_test.go
+++ b/go/arrow/cdata/cdata_test.go
@@ -747,3 +747,13 @@ func TestEmptyDictExport(t *testing.T) {
assert.Nil(t, out.dictionary.children)
assert.Nil(t, out.dictionary.dictionary)
}
+
+func TestRecordReaderExport(t *testing.T) {
+ // Regression test for apache/arrow#33767
+ reclist := arrdata.Records["primitives"]
+ rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)
+
+ if err := exportedStreamTest(rdr); err != nil {
+ t.Fatalf("Failed to test exported stream: %#v", err)
+ }
+}
diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go
index 0f062ded0d..8d0f0a5a82 100644
--- a/go/arrow/cdata/cdata_test_framework.go
+++ b/go/arrow/cdata/cdata_test_framework.go
@@ -53,8 +53,10 @@ package cdata
// struct ArrowSchema** test_struct(const char** fmts, const char** names, int64_t* flags, const int n);
// struct ArrowSchema** test_map(const char** fmts, const char** names, int64_t* flags, const int n);
// struct ArrowSchema** test_schema(const char** fmts, const char** names, int64_t* flags, const int n);
+// int test_exported_stream(struct ArrowArrayStream* stream);
import "C"
import (
+ "fmt"
"unsafe"
"github.com/apache/arrow/go/v11/arrow"
@@ -264,3 +266,14 @@ func arrayStreamTest() *CArrowArrayStream {
C.setup_array_stream_test(2, st)
return st
}
+
+func exportedStreamTest(reader array.RecordReader) error {
+ out := C.get_test_stream()
+ ExportRecordReader(reader, out)
+ rc := C.test_exported_stream(out)
+ C.free(unsafe.Pointer(out))
+ if rc == 0 {
+ return nil
+ }
+ return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
+}