You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/22 20:03:52 UTC
[arrow] branch master updated: ARROW-5493: [Go][Integration] add Go
support for IPC integration tests
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba2763 ARROW-5493: [Go][Integration] add Go support for IPC integration tests
4ba2763 is described below
commit 4ba2763150459c9eb4139e5954d9b5526b8ef0ee
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sat Jun 22 15:03:45 2019 -0500
ARROW-5493: [Go][Integration] add Go support for IPC integration tests
Author: Sebastien Binet <bi...@cern.ch>
Closes #4565 from sbinet/issue-5493 and squashes the following commits:
0385a98b5 <Sebastien Binet> go/arrow/ipc/cmd/arrow-stream-to-file: cleanup
e2d5a5018 <Sebastien Binet> go/arrow/internal/arrdata: remove spurious os.File.Sync on r/o file
1823cec7d <Sebastien Binet> ci: run integration tests as part of Go changes
fc31f3e65 <Sebastien Binet> go/arrow/ipc/cmd/arrow-json-integration-test: add unit test similar to test_integration.py
d0f14dba6 <Sebastien Binet> go/arrow/{arrio,internal/arrdata}: export {Check,Write}{File,Stream} helper functions
a0ec358de <Sebastien Binet> go/arrow{ipc,internal/arrjson}: inherit nullable attribute for {FixedSized}List.Item element
c9132f7b7 <Sebastien Binet> go/arrow/ipc: do not encode Timestamp.TimeZone when empty
8ba7a7556 <Sebastien Binet> go/arrow: handle FixedSizeList array with no offsets
10c08bb31 <Sebastien Binet> integration: handle no GOPATH case
93d499106 <Sebastien Binet> go/arrow: display TimeZone as part of TimestampType Stringer implementation
b82ed9731 <Sebastien Binet> go/arrow/ipc: handle empty streams
90d79fdc3 <Sebastien Binet> integration: skip know to fail Go tests
4bc504d50 <Sebastien Binet> ci: add Go-1.12.x to integration build
9f2e4662a <Sebastien Binet> integration: linting
08e51cc2a <Sebastien Binet> ci,integration: adapt for GO111MODULE=on
73ec80ce3 <Sebastien Binet> dev: add Go tests for release-candidate
51c6a9ff8 <Sebastien Binet> ci: build Go binaries for integration tests
c1b8586d4 <Sebastien Binet> go/arrow/{,ipc,internal/arrdata}: add tests for all Timestamps (s,ms,us,ns)
ae57bac2f <Sebastien Binet> go/arrow/internal/arrjson: implement r/w JSON for Interval and Duration arrays
e5542d541 <Sebastien Binet> go: introduce arrow/arrio package, mirroring stdlib io package
ff953f77c <Sebastien Binet> go: use array.RecordEqual
e8d4de7e1 <Sebastien Binet> go/arrow/internal/arrjson: implement r/w roundtrip
094e9d00f <Sebastien Binet> go/arrow/internal/arrjson: handle hex-string decoding for FixedSizeBinary
043d53763 <Sebastien Binet> go/arrow/array: fixed sub-builders of FixedSizeListBuilder ref-count
b30dc7886 <Sebastien Binet> go/arrow/ipc/cmd/arrow-json-integration-test: first import
16e13e2ba <Sebastien Binet> go/arrow/internal/arrjson: first import
de2ce54e9 <Sebastien Binet> go/arrow/ipc/cmd/arrow-{file,stream}-to-{stream,file}: first import
d3e51f335 <Sebastien Binet> go/arrow/ipc: introduce Record{Reader{,At},Writer} interfaces and Copy{,N} functions
ee4c500c6 <Sebastien Binet> ipc/cmd/arrow-{cat,ls}: simplify memory allocator
d9f077b25 <Sebastien Binet> ARROW-5493: add Go support for IPC integration tests
---
ci/detect-changes.py | 1 +
ci/travis_env_common.sh | 1 +
ci/travis_script_integration.sh | 14 +
dev/release/verify-release-candidate.sh | 13 +
go/arrow/array/array.go | 6 +-
go/arrow/array/array_test.go | 2 +-
go/arrow/array/fixed_size_list.go | 63 +-
go/arrow/array/fixed_size_list_test.go | 12 +-
go/arrow/arrio/arrio.go | 91 ++
go/arrow/arrio/arrio_test.go | 201 +++
go/arrow/datatype_fixedwidth.go | 27 +-
go/arrow/example_test.go | 29 +-
go/arrow/go.mod | 2 +
go/arrow/internal/arrdata/arrdata.go | 77 +-
go/arrow/internal/arrdata/ioutil.go | 143 ++
go/arrow/internal/arrjson/arrjson.go | 1464 ++++++++++++++++++++
go/arrow/internal/arrjson/arrjson_test.go | 110 ++
.../{ipc/ipc.go => internal/arrjson/option.go} | 48 +-
go/arrow/internal/arrjson/reader.go | 100 ++
go/arrow/internal/arrjson/writer.go | 110 ++
go/arrow/ipc/cmd/arrow-cat/main.go | 7 +-
go/arrow/ipc/cmd/arrow-cat/main_test.go | 81 +-
go/arrow/ipc/cmd/arrow-file-to-stream/main.go | 83 ++
.../arrow-file-to-stream/main_test.go} | 56 +-
.../ipc/cmd/arrow-json-integration-test/main.go | 226 +++
.../arrow-json-integration-test/main_test.go} | 78 +-
go/arrow/ipc/cmd/arrow-ls/main.go | 6 +-
go/arrow/ipc/cmd/arrow-ls/main_test.go | 7 +-
go/arrow/ipc/cmd/arrow-stream-to-file/main.go | 71 +
.../arrow-stream-to-file/main_test.go} | 62 +-
go/arrow/ipc/file_reader.go | 25 +-
go/arrow/ipc/file_test.go | 60 +-
go/arrow/ipc/ipc.go | 10 +
go/arrow/ipc/ipc_test.go | 55 -
go/arrow/ipc/metadata.go | 14 +-
go/arrow/ipc/reader.go | 18 +
go/arrow/ipc/stream_test.go | 39 +-
go/arrow/ipc/writer.go | 61 +-
integration/integration_test.py | 75 +
39 files changed, 3055 insertions(+), 493 deletions(-)
diff --git a/ci/detect-changes.py b/ci/detect-changes.py
index 1a30eb5..9194502 100644
--- a/ci/detect-changes.py
+++ b/ci/detect-changes.py
@@ -141,6 +141,7 @@ AFFECTED_DEPENDENCIES = {
'ci': ALL_TOPICS,
'cpp': ['python', 'c_glib', 'r', 'ruby', 'integration'],
'format': LANGUAGE_TOPICS,
+ 'go': ['integration'],
'.travis.yml': ALL_TOPICS,
'c_glib': ['ruby']
}
diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh
index 922b37a..453cd94 100755
--- a/ci/travis_env_common.sh
+++ b/ci/travis_env_common.sh
@@ -38,6 +38,7 @@ export ARROW_INTEGRATION_DIR=$TRAVIS_BUILD_DIR/integration
export ARROW_DEV_DIR=$TRAVIS_BUILD_DIR/dev
export ARROW_CROSSBOW_DIR=$TRAVIS_BUILD_DIR/dev/tasks
export ARROW_RUBY_DIR=$TRAVIS_BUILD_DIR/ruby
+export ARROW_GO_DIR=${TRAVIS_BUILD_DIR}/go
export ARROW_RUST_DIR=${TRAVIS_BUILD_DIR}/rust
export ARROW_R_DIR=${TRAVIS_BUILD_DIR}/r
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index c51005d..c043189 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -40,6 +40,20 @@ npm run build -- -t apache-arrow
popd
+pushd $ARROW_GO_DIR/arrow
+
+eval $(gimme 1.12.x)
+
+export GO111MODULE=on
+export GOBIN=`pwd`/bin
+
+which go
+go version
+go env
+go get -v ./...
+
+popd
+
pushd $ARROW_INTEGRATION_DIR
conda activate $CPP_TOOLCHAIN
diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh
index 80daa42..f14c99d 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -370,6 +370,15 @@ test_rust() {
popd
}
+test_go() {
+ pushd go
+
+ go get -v ./...
+ go test ./...
+
+ popd
+}
+
# Build and test Java (Requires newer Maven -- I used 3.3.9)
test_package_java() {
@@ -424,6 +433,7 @@ import_gpg_keys
: ${TEST_PYTHON:=${TEST_DEFAULT}}
: ${TEST_JS:=${TEST_DEFAULT}}
: ${TEST_INTEGRATION:=${TEST_DEFAULT}}
+: ${TEST_GO:=${TEST_DEFAULT}}
: ${TEST_RUST:=${TEST_DEFAULT}}
: ${TEST_BINARY:=${TEST_DEFAULT}}
: ${TEST_APT:=${TEST_DEFAULT}}
@@ -472,6 +482,9 @@ if [ "$ARTIFACT" == "source" ]; then
if [ ${TEST_INTEGRATION} -gt 0 ]; then
test_integration
fi
+ if [ ${TEST_GO} -gt 0 ]; then
+ test_go
+ fi
if [ ${TEST_RUST} -gt 0 ]; then
test_rust
fi
diff --git a/go/arrow/array/array.go b/go/arrow/array/array.go
index d8418f8..c357733 100644
--- a/go/arrow/array/array.go
+++ b/go/arrow/array/array.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package array
+package array // import "github.com/apache/arrow/go/arrow/array"
import (
"sync/atomic"
@@ -130,6 +130,10 @@ func (a *array) setData(data *Data) {
a.data = data
}
+func (a *array) Offset() int {
+ return a.data.Offset()
+}
+
type arrayConstructorFn func(*Data) Interface
var (
diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index ba3a961..590fa3c 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -79,7 +79,7 @@ func TestMakeFromData(t *testing.T) {
array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
}},
- {name: "fixed_size_list", d: &testDataType{arrow.FIXED_SIZE_LIST}, child: []*array.Data{
+ {name: "fixed_size_list", d: arrow.FixedSizeListOf(4, arrow.PrimitiveTypes.Int64), child: []*array.Data{
array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
}},
diff --git a/go/arrow/array/fixed_size_list.go b/go/arrow/array/fixed_size_list.go
index 1145e4e..6c61513 100644
--- a/go/arrow/array/fixed_size_list.go
+++ b/go/arrow/array/fixed_size_list.go
@@ -30,9 +30,8 @@ import (
// FixedSizeList represents an immutable sequence of N array values.
type FixedSizeList struct {
array
- n int32
- values Interface
- offsets []int32
+ n int32
+ values Interface
}
// NewFixedSizeListData returns a new List array value, from data.
@@ -65,18 +64,17 @@ func (a *FixedSizeList) String() string {
}
func (a *FixedSizeList) newListValue(i int) Interface {
- j := i + a.array.data.offset
- beg := int64(a.offsets[j])
- end := int64(a.offsets[j+1])
- return NewSlice(a.values, beg, end)
+ n := int64(a.n)
+ off := int64(a.array.data.offset)
+ beg := (off + int64(i)) * n
+ end := (off + int64(i+1)) * n
+ sli := NewSlice(a.values, beg, end)
+ return sli
}
func (a *FixedSizeList) setData(data *Data) {
a.array.setData(data)
- vals := data.buffers[1]
- if vals != nil {
- a.offsets = arrow.Int32Traits.CastFromBytes(vals.Bytes())
- }
+ a.n = a.DataType().(*arrow.FixedSizeListType).Len()
a.values = MakeFromData(data.childData[0])
}
@@ -102,8 +100,6 @@ func arrayEqualFixedSizeList(left, right *FixedSizeList) bool {
// Len returns the number of elements in the array.
func (a *FixedSizeList) Len() int { return a.array.Len() }
-func (a *FixedSizeList) Offsets() []int32 { return a.offsets }
-
func (a *FixedSizeList) Retain() {
a.array.Retain()
a.values.Retain()
@@ -117,10 +113,9 @@ func (a *FixedSizeList) Release() {
type FixedSizeListBuilder struct {
builder
- etype arrow.DataType // data type of the list's elements.
- n int32 // number of elements in the fixed-size list.
- values Builder // value builder for the list's elements.
- offsets *Int32Builder
+ etype arrow.DataType // data type of the list's elements.
+ n int32 // number of elements in the fixed-size list.
+ values Builder // value builder for the list's elements.
}
// NewFixedSizeListBuilder returns a builder, using the provided memory allocator.
@@ -131,7 +126,6 @@ func NewFixedSizeListBuilder(mem memory.Allocator, n int32, etype arrow.DataType
etype: etype,
n: n,
values: newBuilder(mem, etype),
- offsets: NewInt32Builder(mem),
}
}
@@ -145,31 +139,25 @@ func (b *FixedSizeListBuilder) Release() {
b.nullBitmap.Release()
b.nullBitmap = nil
}
+ if b.values != nil {
+ b.values.Release()
+ b.values = nil
+ }
}
-
- b.values.Release()
- b.offsets.Release()
-}
-
-func (b *FixedSizeListBuilder) appendNextOffset() {
- b.offsets.Append(int32(b.values.Len()))
}
func (b *FixedSizeListBuilder) Append(v bool) {
b.Reserve(1)
b.unsafeAppendBoolToBitmap(v)
- b.appendNextOffset()
}
func (b *FixedSizeListBuilder) AppendNull() {
b.Reserve(1)
b.unsafeAppendBoolToBitmap(false)
- b.appendNextOffset()
}
-func (b *FixedSizeListBuilder) AppendValues(offsets []int32, valid []bool) {
+func (b *FixedSizeListBuilder) AppendValues(valid []bool) {
b.Reserve(len(valid))
- b.offsets.AppendValues(offsets, nil)
b.builder.unsafeAppendBoolsToBitmap(valid, len(valid))
}
@@ -189,7 +177,6 @@ func (b *FixedSizeListBuilder) unsafeAppendBoolToBitmap(isValid bool) {
func (b *FixedSizeListBuilder) init(capacity int) {
b.builder.init(capacity)
- b.offsets.init(capacity + 1)
}
// Reserve ensures there is enough space for appending n elements
@@ -209,7 +196,6 @@ func (b *FixedSizeListBuilder) Resize(n int) {
b.init(n)
} else {
b.builder.resize(n, b.builder.init)
- b.offsets.resize(n+1, b.offsets.init)
}
}
@@ -226,9 +212,6 @@ func (b *FixedSizeListBuilder) NewArray() Interface {
// NewListArray creates a List array from the memory buffers used by the builder and resets the FixedSizeListBuilder
// so it can be used to build a new array.
func (b *FixedSizeListBuilder) NewListArray() (a *FixedSizeList) {
- if b.offsets.Len() != b.length+1 {
- b.appendNextOffset()
- }
data := b.newData()
a = NewFixedSizeListData(data)
data.Release()
@@ -239,19 +222,9 @@ func (b *FixedSizeListBuilder) newData() (data *Data) {
values := b.values.NewArray()
defer values.Release()
- var offsets *memory.Buffer
- if b.offsets != nil {
- arr := b.offsets.NewInt32Array()
- defer arr.Release()
- offsets = arr.Data().buffers[1]
- }
-
data = NewData(
arrow.FixedSizeListOf(b.n, b.etype), b.length,
- []*memory.Buffer{
- b.nullBitmap,
- offsets,
- },
+ []*memory.Buffer{b.nullBitmap},
[]*Data{values.Data()},
b.nulls,
0,
diff --git a/go/arrow/array/fixed_size_list_test.go b/go/arrow/array/fixed_size_list_test.go
index afbf9e8..d72bc0b 100644
--- a/go/arrow/array/fixed_size_list_test.go
+++ b/go/arrow/array/fixed_size_list_test.go
@@ -33,7 +33,6 @@ func TestFixedSizeListArray(t *testing.T) {
vs = []int32{0, 1, 2, 3, 4, 5, 6}
lengths = []int{3, 0, 4}
isValid = []bool{true, false, true}
- offsets = []int32{0, 3, 3, 7}
)
lb := array.NewFixedSizeListBuilder(pool, int32(len(vs)), arrow.PrimitiveTypes.Int32)
@@ -75,10 +74,6 @@ func TestFixedSizeListArray(t *testing.T) {
}
}
- if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) {
- t.Fatalf("got=%v, want=%v", got, want)
- }
-
varr := arr.ListValues().(*array.Int32)
if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) {
t.Fatalf("got=%v, want=%v", got, want)
@@ -107,7 +102,6 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
vs = []int32{0, 1, 2, 3, 4, 5, 6}
lengths = []int{3, 0, 4}
isValid = []bool{true, false, true}
- offsets = []int32{0, 3, 3, 7}
)
lb := array.NewFixedSizeListBuilder(pool, int32(len(vs)), arrow.PrimitiveTypes.Int32)
@@ -115,7 +109,7 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
vb := lb.ValueBuilder().(*array.Int32Builder)
vb.Reserve(len(vs))
- lb.AppendValues(offsets, isValid)
+ lb.AppendValues(isValid)
for _, v := range vs {
vb.Append(v)
}
@@ -140,10 +134,6 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
}
}
- if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) {
- t.Fatalf("got=%v, want=%v", got, want)
- }
-
varr := arr.ListValues().(*array.Int32)
if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) {
t.Fatalf("got=%v, want=%v", got, want)
diff --git a/go/arrow/arrio/arrio.go b/go/arrow/arrio/arrio.go
new file mode 100644
index 0000000..60cfb78
--- /dev/null
+++ b/go/arrow/arrio/arrio.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package arrio exposes functions to manipulate records, exposing and using
+// interfaces not unlike the ones defined in the stdlib io package.
+package arrio // import "github.com/apache/arrow/go/arrow/arrio"
+
+import (
+ "io"
+
+ "github.com/apache/arrow/go/arrow/array"
+)
+
+// Reader is the interface that wraps the Read method.
+type Reader interface {
+ // Read reads the current record from the underlying stream and an error, if any.
+ // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+ Read() (array.Record, error)
+}
+
+// ReaderAt is the interface that wraps the ReadAt method.
+type ReaderAt interface {
+ // ReadAt reads the i-th record from the underlying stream and an error, if any.
+ ReadAt(i int64) (array.Record, error)
+}
+
+// Writer is the interface that wraps the Write method.
+type Writer interface {
+ Write(rec array.Record) error
+}
+
+// Copy copies all the records available from src to dst.
+// Copy returns the number of records copied and the first error
+// encountered while copying, if any.
+//
+// A successful Copy returns err == nil, not err == EOF. Because Copy is
+// defined to read from src until EOF, it does not treat an EOF from Read as an
+// error to be reported.
+func Copy(dst Writer, src Reader) (n int64, err error) {
+ for {
+ rec, err := src.Read()
+ if err != nil {
+ if err == io.EOF {
+ return n, nil
+ }
+ return n, err
+ }
+ err = dst.Write(rec)
+ if err != nil {
+ return n, err
+ }
+ n++
+ }
+}
+
+// CopyN copies n records (or until an error) from src to dst. It returns the
+// number of records copied and the earliest error encountered while copying. On
+// return, written == n if and only if err == nil.
+func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
+ for ; written < n; written++ {
+ rec, err := src.Read()
+ if err != nil {
+ if err == io.EOF && written == n {
+ return written, nil
+ }
+ return written, err
+ }
+ err = dst.Write(rec)
+ if err != nil {
+ return written, err
+ }
+ }
+
+ if written != n && err == nil {
+ err = io.EOF
+ }
+ return written, err
+}
diff --git a/go/arrow/arrio/arrio_test.go b/go/arrow/arrio/arrio_test.go
new file mode 100644
index 0000000..74f7f73
--- /dev/null
+++ b/go/arrow/arrio/arrio_test.go
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrio_test
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/internal/arrdata"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+)
+
+type copyKind int
+
+const (
+ fileKind copyKind = iota
+ streamKind
+)
+
+func (k copyKind) write(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ switch k {
+ case fileKind:
+ arrdata.WriteFile(t, f, mem, schema, recs)
+ case streamKind:
+ arrdata.WriteStream(t, f, mem, schema, recs)
+ default:
+ panic("invalid copyKind")
+ }
+}
+
+func (k copyKind) check(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ switch k {
+ case fileKind:
+ arrdata.CheckArrowFile(t, f, mem, schema, recs)
+ case streamKind:
+ arrdata.CheckArrowStream(t, f, mem, schema, recs)
+ default:
+ panic("invalid copyKind")
+ }
+}
+
+func TestCopy(t *testing.T) {
+ type kind int
+
+ for _, tc := range []struct {
+ name string
+ src, dst copyKind
+ }{
+ {name: "file2file", src: fileKind, dst: fileKind},
+ {name: "file2stream", src: fileKind, dst: streamKind},
+ {name: "stream2file", src: streamKind, dst: fileKind},
+ {name: "stream2stream", src: streamKind, dst: streamKind},
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ for name, recs := range arrdata.Records {
+ t.Run(name, func(t *testing.T) {
+ for _, tcopy := range []struct {
+ n int
+ want int
+ err error
+ }{
+ {-1, len(recs), nil},
+ {1, 1, nil},
+ {0, 0, nil},
+ {len(recs), len(recs), nil},
+ {len(recs) + 1, len(recs), io.EOF},
+ } {
+ t.Run(fmt.Sprintf("-copy-n=%d", tcopy.n), func(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ f, err := ioutil.TempFile("", "arrow-ipc-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ defer os.Remove(f.Name())
+
+ o, err := ioutil.TempFile("", "arrow-ipc-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer o.Close()
+ defer os.Remove(o.Name())
+
+ tc.src.write(t, f, mem, recs[0].Schema(), recs)
+ tc.src.check(t, f, mem, recs[0].Schema(), recs)
+
+ _, err = f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var r arrio.Reader
+ switch tc.src {
+ case fileKind:
+ rr, err := ipc.NewFileReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer rr.Close()
+ r = rr
+ case streamKind:
+ rr, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer rr.Release()
+ r = rr
+ default:
+ t.Fatalf("invalid src type %v", tc.src)
+ }
+
+ var w interface {
+ arrio.Writer
+ io.Closer
+ }
+
+ switch tc.dst {
+ case fileKind:
+ w, err = ipc.NewFileWriter(o, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ case streamKind:
+ w = ipc.NewWriter(o, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+ default:
+ t.Fatalf("invalid dst type %v", tc.dst)
+ }
+ defer w.Close()
+
+ var (
+ n int64
+ )
+ switch tcopy.n {
+ case -1:
+ n, err = arrio.Copy(w, r)
+ case len(recs) + 1:
+ n, err = arrio.CopyN(w, r, int64(tcopy.n))
+ default:
+ n, err = arrio.CopyN(w, r, int64(tcopy.n))
+ }
+
+ switch err {
+ case nil:
+ if tcopy.err != nil {
+ t.Fatalf("got a nil error, want=%v", tcopy.err)
+ }
+ default:
+ switch tcopy.err {
+ case nil:
+ t.Fatalf("invalid error: got=%v, want=%v", err, tcopy.err)
+ default:
+ if tcopy.err.Error() != err.Error() {
+ t.Fatalf("invalid error: got=%v, want=%v", err, tcopy.err)
+ }
+ }
+ }
+
+ if got, want := n, int64(tcopy.want); got != want {
+ t.Fatalf("invalid number of records copied: got=%d, want=%d", got, want)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tc.dst.check(t, o, mem, recs[0].Schema(), recs[:tcopy.want])
+ })
+ }
+ })
+ }
+ })
+ }
+}
diff --git a/go/arrow/datatype_fixedwidth.go b/go/arrow/datatype_fixedwidth.go
index 5f63ea1..79bb878 100644
--- a/go/arrow/datatype_fixedwidth.go
+++ b/go/arrow/datatype_fixedwidth.go
@@ -69,9 +69,16 @@ type TimestampType struct {
TimeZone string
}
-func (*TimestampType) ID() Type { return TIMESTAMP }
-func (*TimestampType) Name() string { return "timestamp" }
-func (t *TimestampType) String() string { return "timestamp[" + t.Unit.String() + "]" }
+func (*TimestampType) ID() Type { return TIMESTAMP }
+func (*TimestampType) Name() string { return "timestamp" }
+func (t *TimestampType) String() string {
+ switch len(t.TimeZone) {
+ case 0:
+ return "timestamp[" + t.Unit.String() + "]"
+ default:
+ return "timestamp[" + t.Unit.String() + ", tz=" + t.TimeZone + "]"
+ }
+}
// BitWidth returns the number of bits required to store a single element of this data type in memory.
func (*TimestampType) BitWidth() int { return 64 }
@@ -146,8 +153,8 @@ func (t *MonthIntervalType) BitWidth() int { return 32 }
// DayTimeInterval represents a number of days and milliseconds (fraction of day).
type DayTimeInterval struct {
- Days int32
- Milliseconds int32
+ Days int32 `json:"days"`
+ Milliseconds int32 `json:"milliseconds"`
}
// DayTimeIntervalType is encoded as a pair of 32-bit signed integer,
@@ -177,7 +184,10 @@ var (
Time32ms FixedWidthDataType
Time64us FixedWidthDataType
Time64ns FixedWidthDataType
- Timestamp FixedWidthDataType
+ Timestamp_s FixedWidthDataType
+ Timestamp_ms FixedWidthDataType
+ Timestamp_us FixedWidthDataType
+ Timestamp_ns FixedWidthDataType
}{
Boolean: &BooleanType{},
Date32: &Date32Type{},
@@ -193,7 +203,10 @@ var (
Time32ms: &Time32Type{Unit: Millisecond},
Time64us: &Time64Type{Unit: Microsecond},
Time64ns: &Time64Type{Unit: Nanosecond},
- Timestamp: &TimestampType{Unit: Nanosecond, TimeZone: "UTC"},
+ Timestamp_s: &TimestampType{Unit: Second, TimeZone: "UTC"},
+ Timestamp_ms: &TimestampType{Unit: Millisecond, TimeZone: "UTC"},
+ Timestamp_us: &TimestampType{Unit: Microsecond, TimeZone: "UTC"},
+ Timestamp_ns: &TimestampType{Unit: Nanosecond, TimeZone: "UTC"},
}
_ FixedWidthDataType = (*FixedSizeBinaryType)(nil)
diff --git a/go/arrow/example_test.go b/go/arrow/example_test.go
index de65d71..6413aee 100644
--- a/go/arrow/example_test.go
+++ b/go/arrow/example_test.go
@@ -224,6 +224,7 @@ func Example_fixedSizeListArray() {
vb.Append(2)
lb.AppendNull()
+ vb.AppendValues([]int64{-1, -1, -1}, nil)
lb.Append(true)
vb.Append(3)
@@ -242,41 +243,13 @@ func Example_fixedSizeListArray() {
fmt.Printf("NullN() = %d\n", arr.NullN())
fmt.Printf("Len() = %d\n", arr.Len())
- fmt.Printf("Offsets() = %v\n", arr.Offsets())
fmt.Printf("Type() = %v\n", arr.DataType())
-
- offsets := arr.Offsets()[1:]
-
- varr := arr.ListValues().(*array.Int64)
-
- pos := 0
- for i := 0; i < arr.Len(); i++ {
- if !arr.IsValid(i) {
- fmt.Printf("List[%d] = (null)\n", i)
- continue
- }
- fmt.Printf("List[%d] = [", i)
- for j := pos; j < int(offsets[i]); j++ {
- if j != pos {
- fmt.Printf(", ")
- }
- fmt.Printf("%v", varr.Value(j))
- }
- pos = int(offsets[i])
- fmt.Printf("]\n")
- }
fmt.Printf("List = %v\n", arr)
// Output:
// NullN() = 2
// Len() = 5
- // Offsets() = [0 3 3 6 9 9]
// Type() = fixed_size_list<item: int64>[3]
- // List[0] = [0, 1, 2]
- // List[1] = (null)
- // List[2] = [3, 4, 5]
- // List[3] = [6, 7, 8]
- // List[4] = (null)
// List = [[0 1 2] (null) [3 4 5] [6 7 8] (null)]
}
diff --git a/go/arrow/go.mod b/go/arrow/go.mod
index e46257c..fa09504 100644
--- a/go/arrow/go.mod
+++ b/go/arrow/go.mod
@@ -16,6 +16,8 @@
module github.com/apache/arrow/go/arrow
+go 1.12
+
require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/google/flatbuffers v1.11.0
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index 4216619..c249236 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -361,10 +361,16 @@ func makeStringsRecords() []array.Record {
return recs
}
-type time32s arrow.Time32
-type time32ms arrow.Time32
-type time64ns arrow.Time64
-type time64us arrow.Time64
+type (
+ time32s arrow.Time32
+ time32ms arrow.Time32
+ time64ns arrow.Time64
+ time64us arrow.Time64
+ timestamp_s arrow.Timestamp
+ timestamp_ms arrow.Timestamp
+ timestamp_us arrow.Timestamp
+ timestamp_ns arrow.Timestamp
+)
func makeFixedWidthTypesRecords() []array.Record {
mem := memory.NewGoAllocator()
@@ -375,7 +381,10 @@ func makeFixedWidthTypesRecords() []array.Record {
arrow.Field{Name: "time32s", Type: arrow.FixedWidthTypes.Time32s, Nullable: true},
arrow.Field{Name: "time64ns", Type: arrow.FixedWidthTypes.Time64ns, Nullable: true},
arrow.Field{Name: "time64us", Type: arrow.FixedWidthTypes.Time64us, Nullable: true},
- arrow.Field{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp, Nullable: true},
+ arrow.Field{Name: "timestamp_s", Type: arrow.FixedWidthTypes.Timestamp_s, Nullable: true},
+ arrow.Field{Name: "timestamp_ms", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true},
+ arrow.Field{Name: "timestamp_us", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
+ arrow.Field{Name: "timestamp_ns", Type: arrow.FixedWidthTypes.Timestamp_ns, Nullable: true},
arrow.Field{Name: "date32s", Type: arrow.FixedWidthTypes.Date32, Nullable: true},
arrow.Field{Name: "date64s", Type: arrow.FixedWidthTypes.Date64, Nullable: true},
}, nil,
@@ -397,7 +406,10 @@ func makeFixedWidthTypesRecords() []array.Record {
arrayOf(mem, []time32s{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []time64ns{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []time64us{-2, -1, 0, +1, +2}, mask),
- arrayOf(mem, []arrow.Timestamp{0, +1, +2, +3, +4}, mask),
+ arrayOf(mem, []timestamp_s{0, +1, +2, +3, +4}, mask),
+ arrayOf(mem, []timestamp_ms{0, +1, +2, +3, +4}, mask),
+ arrayOf(mem, []timestamp_us{0, +1, +2, +3, +4}, mask),
+ arrayOf(mem, []timestamp_ns{0, +1, +2, +3, +4}, mask),
arrayOf(mem, []arrow.Date32{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []arrow.Date64{-2, -1, 0, +1, +2}, mask),
},
@@ -407,7 +419,10 @@ func makeFixedWidthTypesRecords() []array.Record {
arrayOf(mem, []time32s{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []time64ns{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []time64us{-12, -11, 10, +11, +12}, mask),
- arrayOf(mem, []arrow.Timestamp{10, +11, +12, +13, +14}, mask),
+ arrayOf(mem, []timestamp_s{10, +11, +12, +13, +14}, mask),
+ arrayOf(mem, []timestamp_ms{10, +11, +12, +13, +14}, mask),
+ arrayOf(mem, []timestamp_us{10, +11, +12, +13, +14}, mask),
+ arrayOf(mem, []timestamp_ns{10, +11, +12, +13, +14}, mask),
arrayOf(mem, []arrow.Date32{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []arrow.Date64{-12, -11, 10, +11, +12}, mask),
},
@@ -417,7 +432,10 @@ func makeFixedWidthTypesRecords() []array.Record {
arrayOf(mem, []time32s{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []time64ns{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []time64us{-22, -21, 20, +21, +22}, mask),
- arrayOf(mem, []arrow.Timestamp{20, +21, +22, +23, +24}, mask),
+ arrayOf(mem, []timestamp_s{20, +21, +22, +23, +24}, mask),
+ arrayOf(mem, []timestamp_ms{20, +21, +22, +23, +24}, mask),
+ arrayOf(mem, []timestamp_us{20, +21, +22, +23, +24}, mask),
+ arrayOf(mem, []timestamp_ns{20, +21, +22, +23, +24}, mask),
arrayOf(mem, []arrow.Date32{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []arrow.Date64{-22, -21, 20, +21, +22}, mask),
},
@@ -782,11 +800,48 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
bldr.AppendValues(vs, valids)
return bldr.NewArray()
- case []arrow.Timestamp:
- bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp.(*arrow.TimestampType))
+ case []timestamp_s:
+ bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_s.(*arrow.TimestampType))
defer bldr.Release()
- bldr.AppendValues(a, valids)
+ vs := make([]arrow.Timestamp, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Timestamp(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []timestamp_ms:
+ bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_ms.(*arrow.TimestampType))
+ defer bldr.Release()
+
+ vs := make([]arrow.Timestamp, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Timestamp(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []timestamp_us:
+ bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_us.(*arrow.TimestampType))
+ defer bldr.Release()
+
+ vs := make([]arrow.Timestamp, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Timestamp(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []timestamp_ns:
+ bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_ns.(*arrow.TimestampType))
+ defer bldr.Release()
+
+ vs := make([]arrow.Timestamp, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Timestamp(v)
+ }
+ bldr.AppendValues(vs, valids)
return bldr.NewArray()
case []arrow.Date32:
diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go
new file mode 100644
index 0000000..7065f64
--- /dev/null
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrdata // import "github.com/apache/arrow/go/arrow/internal/arrdata"
+
+import (
+ "io"
+ "os"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+)
+
+// CheckArrowFile checks whether a given ARROW file contains the expected list of records.
+func CheckArrowFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ _, err := f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer r.Close()
+
+ for i := 0; i < r.NumRecords(); i++ {
+ rec, err := r.Record(i)
+ if err != nil {
+ t.Fatalf("could not read record %d: %v", i, err)
+ }
+ if !array.RecordEqual(rec, recs[i]) {
+ t.Fatalf("records[%d] differ", i)
+ }
+ }
+
+ err = r.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+// CheckArrowStream checks whether a given ARROW stream contains the expected list of records.
+func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ _, err := f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r, err := ipc.NewReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer r.Release()
+
+ n := 0
+ for r.Next() {
+ rec := r.Record()
+ if !array.RecordEqual(rec, recs[n]) {
+ t.Fatalf("records[%d] differ", n)
+ }
+ n++
+ }
+
+ if len(recs) != n {
+ t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
+
+ }
+}
+
+// WriteFile writes a list of records to the given file descriptor, as an ARROW file.
+func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ w, err := ipc.NewFileWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer w.Close()
+
+ for i, rec := range recs {
+ err = w.Write(rec)
+ if err != nil {
+ t.Fatalf("could not write record[%d]: %v", i, err)
+ }
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = f.Sync()
+ if err != nil {
+ t.Fatalf("could not sync data to disk: %v", err)
+ }
+
+ _, err = f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatalf("could not seek to start: %v", err)
+ }
+}
+
+// WriteStream writes a list of records to the given file descriptor, as an ARROW stream.
+func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ w := ipc.NewWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+ defer w.Close()
+
+ for i, rec := range recs {
+ err := w.Write(rec)
+ if err != nil {
+ t.Fatalf("could not write record[%d]: %v", i, err)
+ }
+ }
+
+ err := w.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/go/arrow/internal/arrjson/arrjson.go b/go/arrow/internal/arrjson/arrjson.go
new file mode 100644
index 0000000..11cc3d0
--- /dev/null
+++ b/go/arrow/internal/arrjson/arrjson.go
@@ -0,0 +1,1464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package arrjson provides types and functions to encode and decode ARROW types and data
+// to and from JSON files.
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+ "encoding/hex"
+ "encoding/json"
+ "strconv"
+ "strings"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/float16"
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/pkg/errors"
+)
+
+const (
+ kData = "DATA"
+ kDays = "days"
+ kDayTime = "DAY_TIME"
+ kDuration = "duration"
+ kMilliseconds = "milliseconds"
+ kYearMonth = "YEAR_MONTH"
+)
+
+type Schema struct {
+ Fields []Field `json:"fields"`
+}
+
+type Field struct {
+ Name string `json:"name"`
+ Type dataType `json:"type"`
+ Nullable bool `json:"nullable"`
+ Children []Field `json:"children"`
+}
+
+type dataType struct {
+ Name string `json:"name"`
+ Signed bool `json:"isSigned,omitempty"`
+ BitWidth int `json:"bitWidth,omitempty"`
+ Precision string `json:"precision,omitempty"`
+ ByteWidth int `json:"byteWidth,omitempty"`
+ ListSize int32 `json:"listSize,omitempty"`
+ Unit string `json:"unit,omitempty"`
+ TimeZone string `json:"timezone,omitempty"`
+ Scale int `json:"scale,omitempty"` // for Decimal128
+}
+
+func dtypeToJSON(dt arrow.DataType) dataType {
+ switch dt := dt.(type) {
+ case *arrow.BooleanType:
+ return dataType{Name: "bool"}
+ case *arrow.Int8Type:
+ return dataType{Name: "int", Signed: true, BitWidth: 8}
+ case *arrow.Int16Type:
+ return dataType{Name: "int", Signed: true, BitWidth: 16}
+ case *arrow.Int32Type:
+ return dataType{Name: "int", Signed: true, BitWidth: 32}
+ case *arrow.Int64Type:
+ return dataType{Name: "int", Signed: true, BitWidth: 64}
+ case *arrow.Uint8Type:
+ return dataType{Name: "int", BitWidth: 8}
+ case *arrow.Uint16Type:
+ return dataType{Name: "int", BitWidth: 16}
+ case *arrow.Uint32Type:
+ return dataType{Name: "int", BitWidth: 32}
+ case *arrow.Uint64Type:
+ return dataType{Name: "int", BitWidth: 64}
+ case *arrow.Float16Type:
+ return dataType{Name: "floatingpoint", Precision: "HALF"}
+ case *arrow.Float32Type:
+ return dataType{Name: "floatingpoint", Precision: "SINGLE"}
+ case *arrow.Float64Type:
+ return dataType{Name: "floatingpoint", Precision: "DOUBLE"}
+ case *arrow.BinaryType:
+ return dataType{Name: "binary"}
+ case *arrow.StringType:
+ return dataType{Name: "utf8"}
+ case *arrow.Date32Type:
+ return dataType{Name: "date", Unit: "DAY"}
+ case *arrow.Date64Type:
+ return dataType{Name: "date", Unit: "MILLISECOND"}
+ case *arrow.Time32Type:
+ switch dt.Unit {
+ case arrow.Second:
+ return dataType{Name: "time", Unit: "SECOND", BitWidth: dt.BitWidth()}
+ case arrow.Millisecond:
+ return dataType{Name: "time", Unit: "MILLISECOND", BitWidth: dt.BitWidth()}
+ }
+ case *arrow.Time64Type:
+ switch dt.Unit {
+ case arrow.Microsecond:
+ return dataType{Name: "time", Unit: "MICROSECOND", BitWidth: dt.BitWidth()}
+ case arrow.Nanosecond:
+ return dataType{Name: "time", Unit: "NANOSECOND", BitWidth: dt.BitWidth()}
+ }
+ case *arrow.TimestampType:
+ switch dt.Unit {
+ case arrow.Second:
+ return dataType{Name: "timestamp", Unit: "SECOND", TimeZone: dt.TimeZone}
+ case arrow.Millisecond:
+ return dataType{Name: "timestamp", Unit: "MILLISECOND", TimeZone: dt.TimeZone}
+ case arrow.Microsecond:
+ return dataType{Name: "timestamp", Unit: "MICROSECOND", TimeZone: dt.TimeZone}
+ case arrow.Nanosecond:
+ return dataType{Name: "timestamp", Unit: "NANOSECOND", TimeZone: dt.TimeZone}
+ }
+ case *arrow.MonthIntervalType:
+ return dataType{Name: "interval", Unit: "YEAR_MONTH"}
+ case *arrow.DayTimeIntervalType:
+ return dataType{Name: "interval", Unit: "DAY_TIME"}
+ case *arrow.DurationType:
+ switch dt.Unit {
+ case arrow.Second:
+ return dataType{Name: "duration", Unit: "SECOND"}
+ case arrow.Millisecond:
+ return dataType{Name: "duration", Unit: "MILLISECOND"}
+ case arrow.Microsecond:
+ return dataType{Name: "duration", Unit: "MICROSECOND"}
+ case arrow.Nanosecond:
+ return dataType{Name: "duration", Unit: "NANOSECOND"}
+ }
+
+ case *arrow.ListType:
+ return dataType{Name: "list"}
+ case *arrow.StructType:
+ return dataType{Name: "struct"}
+ case *arrow.FixedSizeListType:
+ return dataType{Name: "fixedsizelist", ListSize: dt.Len()}
+ case *arrow.FixedSizeBinaryType:
+ return dataType{
+ Name: "fixedsizebinary",
+ ByteWidth: dt.ByteWidth,
+ }
+ }
+ panic(errors.Errorf("unknown arrow.DataType %v", dt))
+}
+
+func dtypeFromJSON(dt dataType, children []Field) arrow.DataType {
+ switch dt.Name {
+ case "bool":
+ return arrow.FixedWidthTypes.Boolean
+ case "int":
+ switch dt.Signed {
+ case true:
+ switch dt.BitWidth {
+ case 8:
+ return arrow.PrimitiveTypes.Int8
+ case 16:
+ return arrow.PrimitiveTypes.Int16
+ case 32:
+ return arrow.PrimitiveTypes.Int32
+ case 64:
+ return arrow.PrimitiveTypes.Int64
+ }
+ default:
+ switch dt.BitWidth {
+ case 8:
+ return arrow.PrimitiveTypes.Uint8
+ case 16:
+ return arrow.PrimitiveTypes.Uint16
+ case 32:
+ return arrow.PrimitiveTypes.Uint32
+ case 64:
+ return arrow.PrimitiveTypes.Uint64
+ }
+ }
+ case "floatingpoint":
+ switch dt.Precision {
+ case "HALF":
+ return arrow.FixedWidthTypes.Float16
+ case "SINGLE":
+ return arrow.PrimitiveTypes.Float32
+ case "DOUBLE":
+ return arrow.PrimitiveTypes.Float64
+ }
+ case "binary":
+ return arrow.BinaryTypes.Binary
+ case "utf8":
+ return arrow.BinaryTypes.String
+ case "date":
+ switch dt.Unit {
+ case "DAY":
+ return arrow.FixedWidthTypes.Date32
+ case "MILLISECOND":
+ return arrow.FixedWidthTypes.Date64
+ }
+ case "time":
+ switch dt.BitWidth {
+ case 32:
+ switch dt.Unit {
+ case "SECOND":
+ return arrow.FixedWidthTypes.Time32s
+ case "MILLISECOND":
+ return arrow.FixedWidthTypes.Time32ms
+ }
+ case 64:
+ switch dt.Unit {
+ case "MICROSECOND":
+ return arrow.FixedWidthTypes.Time64us
+ case "NANOSECOND":
+ return arrow.FixedWidthTypes.Time64ns
+ }
+ }
+ case "timestamp":
+ switch dt.Unit {
+ case "SECOND":
+ return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Second}
+ case "MILLISECOND":
+ return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Millisecond}
+ case "MICROSECOND":
+ return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Microsecond}
+ case "NANOSECOND":
+ return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Nanosecond}
+ }
+ case "list":
+ return arrow.ListOf(dtypeFromJSON(children[0].Type, nil))
+ case "struct":
+ return arrow.StructOf(fieldsFromJSON(children)...)
+ case "fixedsizebinary":
+ return &arrow.FixedSizeBinaryType{ByteWidth: dt.ByteWidth}
+ case "fixedsizelist":
+ return arrow.FixedSizeListOf(dt.ListSize, dtypeFromJSON(children[0].Type, nil))
+ case "interval":
+ switch dt.Unit {
+ case "YEAR_MONTH":
+ return arrow.FixedWidthTypes.MonthInterval
+ case "DAY_TIME":
+ return arrow.FixedWidthTypes.DayTimeInterval
+ }
+ case "duration":
+ switch dt.Unit {
+ case "SECOND":
+ return arrow.FixedWidthTypes.Duration_s
+ case "MILLISECOND":
+ return arrow.FixedWidthTypes.Duration_ms
+ case "MICROSECOND":
+ return arrow.FixedWidthTypes.Duration_us
+ case "NANOSECOND":
+ return arrow.FixedWidthTypes.Duration_ns
+ }
+ }
+ panic(errors.Errorf("unknown DataType %#v", dt))
+}
+
+func schemaToJSON(schema *arrow.Schema) Schema {
+ return Schema{
+ Fields: fieldsToJSON(schema.Fields()),
+ }
+}
+
+func schemaFromJSON(schema Schema) *arrow.Schema {
+ return arrow.NewSchema(fieldsFromJSON(schema.Fields), nil)
+}
+
+func fieldsToJSON(fields []arrow.Field) []Field {
+ o := make([]Field, len(fields))
+ for i, f := range fields {
+ o[i] = Field{
+ Name: f.Name,
+ Type: dtypeToJSON(f.Type),
+ Nullable: f.Nullable,
+ Children: []Field{},
+ }
+ switch dt := f.Type.(type) {
+ case *arrow.ListType:
+ o[i].Children = fieldsToJSON([]arrow.Field{{Name: "item", Type: dt.Elem(), Nullable: f.Nullable}})
+ case *arrow.FixedSizeListType:
+ o[i].Children = fieldsToJSON([]arrow.Field{{Name: "item", Type: dt.Elem(), Nullable: f.Nullable}})
+ case *arrow.StructType:
+ o[i].Children = fieldsToJSON(dt.Fields())
+ }
+ }
+ return o
+}
+
+func fieldsFromJSON(fields []Field) []arrow.Field {
+ vs := make([]arrow.Field, len(fields))
+ for i, v := range fields {
+ vs[i] = fieldFromJSON(v)
+ }
+ return vs
+}
+
+func fieldFromJSON(f Field) arrow.Field {
+ return arrow.Field{
+ Name: f.Name,
+ Type: dtypeFromJSON(f.Type, f.Children),
+ Nullable: f.Nullable,
+ }
+}
+
+type Record struct {
+ Count int64 `json:"count"`
+ Columns []Array `json:"columns"`
+}
+
+func recordsFromJSON(mem memory.Allocator, schema *arrow.Schema, recs []Record) []array.Record {
+ vs := make([]array.Record, len(recs))
+ for i, rec := range recs {
+ vs[i] = recordFromJSON(mem, schema, rec)
+ }
+ return vs
+}
+
+func recordFromJSON(mem memory.Allocator, schema *arrow.Schema, rec Record) array.Record {
+ arrs := arraysFromJSON(mem, schema, rec.Columns)
+ defer func() {
+ for _, arr := range arrs {
+ arr.Release()
+ }
+ }()
+ return array.NewRecord(schema, arrs, int64(rec.Count))
+}
+
+func recordToJSON(rec array.Record) Record {
+ return Record{
+ Count: rec.NumRows(),
+ Columns: arraysToJSON(rec.Schema(), rec.Columns()),
+ }
+}
+
+type Array struct {
+ Name string `json:"name"`
+ Count int `json:"count"`
+ Valids []int `json:"VALIDITY,omitempty"`
+ Data []interface{} `json:"DATA,omitempty"`
+ Offset []int32 `json:"OFFSET,omitempty"`
+ Children []Array `json:"children,omitempty"`
+}
+
+func arraysFromJSON(mem memory.Allocator, schema *arrow.Schema, arrs []Array) []array.Interface {
+ o := make([]array.Interface, len(arrs))
+ for i, v := range arrs {
+ o[i] = arrayFromJSON(mem, schema.Field(i).Type, v)
+ }
+ return o
+}
+
+func arraysToJSON(schema *arrow.Schema, arrs []array.Interface) []Array {
+ o := make([]Array, len(arrs))
+ for i, v := range arrs {
+ o[i] = arrayToJSON(schema.Field(i), v)
+ }
+ return o
+}
+
+func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) array.Interface {
+ switch dt := dt.(type) {
+ case *arrow.BooleanType:
+ bldr := array.NewBooleanBuilder(mem)
+ defer bldr.Release()
+ data := boolsFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Int8Type:
+ bldr := array.NewInt8Builder(mem)
+ defer bldr.Release()
+ data := i8FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Int16Type:
+ bldr := array.NewInt16Builder(mem)
+ defer bldr.Release()
+ data := i16FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Int32Type:
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ data := i32FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Int64Type:
+ bldr := array.NewInt64Builder(mem)
+ defer bldr.Release()
+ data := i64FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Uint8Type:
+ bldr := array.NewUint8Builder(mem)
+ defer bldr.Release()
+ data := u8FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Uint16Type:
+ bldr := array.NewUint16Builder(mem)
+ defer bldr.Release()
+ data := u16FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Uint32Type:
+ bldr := array.NewUint32Builder(mem)
+ defer bldr.Release()
+ data := u32FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Uint64Type:
+ bldr := array.NewUint64Builder(mem)
+ defer bldr.Release()
+ data := u64FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Float16Type:
+ bldr := array.NewFloat16Builder(mem)
+ defer bldr.Release()
+ data := f16FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Float32Type:
+ bldr := array.NewFloat32Builder(mem)
+ defer bldr.Release()
+ data := f32FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Float64Type:
+ bldr := array.NewFloat64Builder(mem)
+ defer bldr.Release()
+ data := f64FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.StringType:
+ bldr := array.NewStringBuilder(mem)
+ defer bldr.Release()
+ data := strFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.BinaryType:
+ bldr := array.NewBinaryBuilder(mem, dt)
+ defer bldr.Release()
+ data := bytesFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.ListType:
+ bldr := array.NewListBuilder(mem, dt.Elem())
+ defer bldr.Release()
+ valids := validsFromJSON(arr.Valids)
+ elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0])
+ defer elems.Release()
+ for i, v := range valids {
+ bldr.Append(v)
+ beg := int64(arr.Offset[i])
+ end := int64(arr.Offset[i+1])
+ slice := array.NewSlice(elems, beg, end)
+ buildArray(bldr.ValueBuilder(), slice)
+ slice.Release()
+ }
+ return bldr.NewArray()
+
+ case *arrow.FixedSizeListType:
+ bldr := array.NewFixedSizeListBuilder(mem, dt.Len(), dt.Elem())
+ defer bldr.Release()
+ valids := validsFromJSON(arr.Valids)
+ elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0])
+ defer elems.Release()
+ size := int64(dt.Len())
+ for i, v := range valids {
+ bldr.Append(v)
+ beg := int64(i) * size
+ end := int64(i+1) * size
+ slice := array.NewSlice(elems, beg, end)
+ buildArray(bldr.ValueBuilder(), slice)
+ slice.Release()
+ }
+ return bldr.NewArray()
+
+ case *arrow.StructType:
+ bldr := array.NewStructBuilder(mem, dt)
+ defer bldr.Release()
+ valids := validsFromJSON(arr.Valids)
+ fields := make([]array.Interface, len(dt.Fields()))
+ for i := range fields {
+ fields[i] = arrayFromJSON(mem, dt.Field(i).Type, arr.Children[i])
+ }
+
+ bldr.AppendValues(valids)
+ for i := range dt.Fields() {
+ fbldr := bldr.FieldBuilder(i)
+ buildArray(fbldr, fields[i])
+ fields[i].Release()
+ }
+
+ return bldr.NewArray()
+
+ case *arrow.FixedSizeBinaryType:
+ bldr := array.NewFixedSizeBinaryBuilder(mem, dt)
+ defer bldr.Release()
+ strdata := strFromJSON(arr.Data)
+ data := make([][]byte, len(strdata))
+ for i, v := range strdata {
+ if len(v) != 2*dt.ByteWidth {
+ panic(errors.Errorf("arrjson: invalid hex-string length (got=%d, want=%d)", len(v), 2*dt.ByteWidth))
+ }
+ vv, err := hex.DecodeString(v)
+ if err != nil {
+ panic(err)
+ }
+ data[i] = vv
+ }
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Date32Type:
+ bldr := array.NewDate32Builder(mem)
+ defer bldr.Release()
+ data := date32FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Date64Type:
+ bldr := array.NewDate64Builder(mem)
+ defer bldr.Release()
+ data := date64FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Time32Type:
+ bldr := array.NewTime32Builder(mem, dt)
+ defer bldr.Release()
+ data := time32FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.Time64Type:
+ bldr := array.NewTime64Builder(mem, dt)
+ defer bldr.Release()
+ data := time64FromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.TimestampType:
+ bldr := array.NewTimestampBuilder(mem, dt)
+ defer bldr.Release()
+ data := timestampFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.MonthIntervalType:
+ bldr := array.NewMonthIntervalBuilder(mem)
+ defer bldr.Release()
+ data := monthintervalFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.DayTimeIntervalType:
+ bldr := array.NewDayTimeIntervalBuilder(mem)
+ defer bldr.Release()
+ data := daytimeintervalFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ case *arrow.DurationType:
+ bldr := array.NewDurationBuilder(mem, dt)
+ defer bldr.Release()
+ data := durationFromJSON(arr.Data)
+ valids := validsFromJSON(arr.Valids)
+ bldr.AppendValues(data, valids)
+ return bldr.NewArray()
+
+ default:
+ panic(errors.Errorf("unknown data type %v %T", dt, dt))
+ }
+ panic("impossible")
+}
+
+func arrayToJSON(field arrow.Field, arr array.Interface) Array {
+ switch arr := arr.(type) {
+ case *array.Boolean:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: boolsToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Int8:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: i8ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Int16:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: i16ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Int32:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: i32ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Int64:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: i64ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Uint8:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: u8ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Uint16:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: u16ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Uint32:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: u32ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Uint64:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: u64ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Float16:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: f16ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Float32:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: f32ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Float64:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: f64ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.String:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: strToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Binary:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: bytesToJSON(arr),
+ Valids: validsToJSON(arr),
+ Offset: arr.ValueOffsets(),
+ }
+
+ case *array.List:
+ o := Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Valids: validsToJSON(arr),
+ Offset: arr.Offsets(),
+ Children: []Array{
+ arrayToJSON(arrow.Field{Name: "item", Type: arr.DataType().(*arrow.ListType).Elem()}, arr.ListValues()),
+ },
+ }
+ return o
+
+ case *array.FixedSizeList:
+ o := Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Valids: validsToJSON(arr),
+ Children: []Array{
+ arrayToJSON(arrow.Field{Name: "", Type: arr.DataType().(*arrow.FixedSizeListType).Elem()}, arr.ListValues()),
+ },
+ }
+ return o
+
+ case *array.Struct:
+ dt := arr.DataType().(*arrow.StructType)
+ o := Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Valids: validsToJSON(arr),
+ Children: make([]Array, len(dt.Fields())),
+ }
+ for i := range o.Children {
+ o.Children[i] = arrayToJSON(dt.Field(i), arr.Field(i))
+ }
+ return o
+
+ case *array.FixedSizeBinary:
+ dt := arr.DataType().(*arrow.FixedSizeBinaryType)
+ o := Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Valids: validsToJSON(arr),
+ Data: make([]interface{}, arr.Len()),
+ }
+ for i := range o.Data {
+ v := []byte(strings.ToUpper(hex.EncodeToString(arr.Value(i))))
+ if len(v) != 2*dt.ByteWidth {
+ panic(errors.Errorf("arrjson: invalid hex-string length (got=%d, want=%d)", len(v), 2*dt.ByteWidth))
+ }
+ o.Data[i] = string(v) // re-convert as string to prevent json.Marshal from base64-encoding it.
+ }
+ return o
+
+ case *array.Date32:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: date32ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Date64:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: date64ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Time32:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: time32ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Time64:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: time64ToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ case *array.Timestamp:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: timestampToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+ case *array.MonthInterval:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: monthintervalToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+ case *array.DayTimeInterval:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: daytimeintervalToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+ case *array.Duration:
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Data: durationToJSON(arr),
+ Valids: validsToJSON(arr),
+ }
+
+ default:
+ panic(errors.Errorf("unknown array type %T", arr))
+ }
+ panic("impossible")
+}
+
+func validsFromJSON(vs []int) []bool {
+ o := make([]bool, len(vs))
+ for i, v := range vs {
+ if v > 0 {
+ o[i] = true
+ }
+ }
+ return o
+}
+
+func validsToJSON(arr array.Interface) []int {
+ o := make([]int, arr.Len())
+ for i := range o {
+ if arr.IsValid(i) {
+ o[i] = 1
+ }
+ }
+ return o
+}
+
+func boolsFromJSON(vs []interface{}) []bool {
+ o := make([]bool, len(vs))
+ for i, v := range vs {
+ o[i] = v.(bool)
+ }
+ return o
+}
+
+func boolsToJSON(arr *array.Boolean) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func i8FromJSON(vs []interface{}) []int8 {
+ o := make([]int8, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = int8(vv)
+ }
+ return o
+}
+
+func i8ToJSON(arr *array.Int8) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func i16FromJSON(vs []interface{}) []int16 {
+ o := make([]int16, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = int16(vv)
+ }
+ return o
+}
+
+func i16ToJSON(arr *array.Int16) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func i32FromJSON(vs []interface{}) []int32 {
+ o := make([]int32, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = int32(vv)
+ }
+ return o
+}
+
+func i32ToJSON(arr *array.Int32) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func i64FromJSON(vs []interface{}) []int64 {
+ o := make([]int64, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = int64(vv)
+ }
+ return o
+}
+
+func i64ToJSON(arr *array.Int64) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func u8FromJSON(vs []interface{}) []uint8 {
+ o := make([]uint8, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = uint8(vv)
+ }
+ return o
+}
+
+func u8ToJSON(arr *array.Uint8) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func u16FromJSON(vs []interface{}) []uint16 {
+ o := make([]uint16, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = uint16(vv)
+ }
+ return o
+}
+
+func u16ToJSON(arr *array.Uint16) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func u32FromJSON(vs []interface{}) []uint32 {
+ o := make([]uint32, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = uint32(vv)
+ }
+ return o
+}
+
+func u32ToJSON(arr *array.Uint32) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func u64FromJSON(vs []interface{}) []uint64 {
+ o := make([]uint64, len(vs))
+ for i, v := range vs {
+ vv, err := strconv.ParseUint(v.(json.Number).String(), 10, 64)
+ if err != nil {
+ panic(err)
+ }
+ o[i] = uint64(vv)
+ }
+ return o
+}
+
+func u64ToJSON(arr *array.Uint64) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func f16FromJSON(vs []interface{}) []float16.Num {
+ o := make([]float16.Num, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Float64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = float16.New(float32(vv))
+ }
+ return o
+}
+
+func f16ToJSON(arr *array.Float16) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i).Float32()
+ }
+ return o
+}
+
+func f32FromJSON(vs []interface{}) []float32 {
+ o := make([]float32, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Float64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = float32(vv)
+ }
+ return o
+}
+
+func f32ToJSON(arr *array.Float32) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func f64FromJSON(vs []interface{}) []float64 {
+ o := make([]float64, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Float64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = float64(vv)
+ }
+ return o
+}
+
+func f64ToJSON(arr *array.Float64) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func strFromJSON(vs []interface{}) []string {
+ o := make([]string, len(vs))
+ for i, v := range vs {
+ switch v := v.(type) {
+ case string:
+ o[i] = v
+ case json.Number:
+ o[i] = v.String()
+ default:
+ panic(errors.Errorf("could not convert %v (%T) to a string", v, v))
+ }
+ }
+ return o
+}
+
+func strToJSON(arr *array.String) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func bytesFromJSON(vs []interface{}) [][]byte {
+ o := make([][]byte, len(vs))
+ for i, v := range vs {
+ var err error
+ switch v := v.(type) {
+ case string:
+ o[i], err = hex.DecodeString(v)
+ case json.Number:
+ o[i], err = hex.DecodeString(v.String())
+ default:
+ panic(errors.Errorf("could not convert %v (%T) to a string", v, v))
+ }
+ if err != nil {
+ panic(errors.Errorf("could not decode %v: %v", v, err))
+ }
+ }
+ return o
+}
+
+func bytesToJSON(arr *array.Binary) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = strings.ToUpper(hex.EncodeToString(arr.Value(i)))
+ }
+ return o
+}
+
+func date32FromJSON(vs []interface{}) []arrow.Date32 {
+ o := make([]arrow.Date32, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Date32(vv)
+ }
+ return o
+}
+
+func date32ToJSON(arr *array.Date32) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int32(arr.Value(i))
+ }
+ return o
+}
+
+func date64FromJSON(vs []interface{}) []arrow.Date64 {
+ o := make([]arrow.Date64, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Date64(vv)
+ }
+ return o
+}
+
+func date64ToJSON(arr *array.Date64) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int64(arr.Value(i))
+ }
+ return o
+}
+
+func time32FromJSON(vs []interface{}) []arrow.Time32 {
+ o := make([]arrow.Time32, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Time32(vv)
+ }
+ return o
+}
+
+func time32ToJSON(arr *array.Time32) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int32(arr.Value(i))
+ }
+ return o
+}
+
+func time64FromJSON(vs []interface{}) []arrow.Time64 {
+ o := make([]arrow.Time64, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Time64(vv)
+ }
+ return o
+}
+
+func time64ToJSON(arr *array.Time64) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int64(arr.Value(i))
+ }
+ return o
+}
+
+func timestampFromJSON(vs []interface{}) []arrow.Timestamp {
+ o := make([]arrow.Timestamp, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Timestamp(vv)
+ }
+ return o
+}
+
+func timestampToJSON(arr *array.Timestamp) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int64(arr.Value(i))
+ }
+ return o
+}
+
+func monthintervalFromJSON(vs []interface{}) []arrow.MonthInterval {
+ o := make([]arrow.MonthInterval, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.MonthInterval(int32(vv))
+ }
+ return o
+}
+
+func monthintervalToJSON(arr *array.MonthInterval) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = int32(arr.Value(i))
+ }
+ return o
+}
+
+func daytimeintervalFromJSON(vs []interface{}) []arrow.DayTimeInterval {
+ o := make([]arrow.DayTimeInterval, len(vs))
+ for i, vv := range vs {
+ v := vv.(map[string]interface{})
+ days, err := v["days"].(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ ms, err := v["milliseconds"].(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.DayTimeInterval{Days: int32(days), Milliseconds: int32(ms)}
+ }
+ return o
+}
+
+func daytimeintervalToJSON(arr *array.DayTimeInterval) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func durationFromJSON(vs []interface{}) []arrow.Duration {
+ o := make([]arrow.Duration, len(vs))
+ for i, v := range vs {
+ vv, err := v.(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+ o[i] = arrow.Duration(vv)
+ }
+ return o
+}
+
+func durationToJSON(arr *array.Duration) []interface{} {
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ o[i] = arr.Value(i)
+ }
+ return o
+}
+
+func buildArray(bldr array.Builder, data array.Interface) {
+ defer data.Release()
+
+ switch bldr := bldr.(type) {
+ default:
+ panic(errors.Errorf("unknown builder %T", bldr))
+
+ case *array.BooleanBuilder:
+ data := data.(*array.Boolean)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Int8Builder:
+ data := data.(*array.Int8)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Int16Builder:
+ data := data.(*array.Int16)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Int32Builder:
+ data := data.(*array.Int32)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Int64Builder:
+ data := data.(*array.Int64)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Uint8Builder:
+ data := data.(*array.Uint8)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Uint16Builder:
+ data := data.(*array.Uint16)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Uint32Builder:
+ data := data.(*array.Uint32)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Uint64Builder:
+ data := data.(*array.Uint64)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Float32Builder:
+ data := data.(*array.Float32)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.Float64Builder:
+ data := data.(*array.Float64)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+
+ case *array.StringBuilder:
+ data := data.(*array.String)
+ for i := 0; i < data.Len(); i++ {
+ switch {
+ case data.IsValid(i):
+ bldr.Append(data.Value(i))
+ default:
+ bldr.AppendNull()
+ }
+ }
+ }
+}
diff --git a/go/arrow/internal/arrjson/arrjson_test.go b/go/arrow/internal/arrjson/arrjson_test.go
new file mode 100644
index 0000000..61dfd53
--- /dev/null
+++ b/go/arrow/internal/arrjson/arrjson_test.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/internal/arrdata"
+ "github.com/apache/arrow/go/arrow/memory"
+)
+
+func TestReadWrite(t *testing.T) {
+ for name, recs := range arrdata.Records {
+ t.Run(name, func(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ f, err := ioutil.TempFile("", "arrjson-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ defer os.RemoveAll(f.Name())
+
+ w, err := NewWriter(f, recs[0].Schema())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer w.Close()
+
+ for i, rec := range recs {
+ err = w.Write(rec)
+ if err != nil {
+ t.Fatalf("could not write record[%d] to JSON: %v", i, err)
+ }
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Fatalf("could not close JSON writer: %v", err)
+ }
+
+ err = f.Sync()
+ if err != nil {
+ t.Fatalf("could not sync data to disk: %v", err)
+ }
+
+ _, err = f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatalf("could not rewind file: %v", err)
+ }
+
+ r, err := NewReader(f, WithAllocator(mem), WithSchema(recs[0].Schema()))
+ if err != nil {
+ raw, _ := ioutil.ReadFile(f.Name())
+ t.Fatalf("could not read JSON file: %v\n%v\n", err, string(raw))
+ }
+ defer r.Release()
+
+ r.Retain()
+ r.Release()
+
+ if got, want := r.Schema(), recs[0].Schema(); !got.Equal(want) {
+ t.Fatalf("invalid schema\ngot:\n%v\nwant:\n%v\n", got, want)
+ }
+
+ if got, want := r.NumRecords(), len(recs); got != want {
+ t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+ }
+
+ nrecs := 0
+ for {
+ rec, err := r.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.Fatalf("could not read record[%d]: %v", nrecs, err)
+ }
+
+ if !array.RecordEqual(rec, recs[nrecs]) {
+ t.Fatalf("records[%d] differ", nrecs)
+ }
+ nrecs++
+ }
+
+ if got, want := nrecs, len(recs); got != want {
+ t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+ }
+ })
+ }
+}
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/internal/arrjson/option.go
similarity index 54%
copy from go/arrow/ipc/ipc.go
copy to go/arrow/internal/arrjson/option.go
index 470baf2..ea89f6e 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/internal/arrjson/option.go
@@ -14,55 +14,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package ipc // import "github.com/apache/arrow/go/arrow/ipc"
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
import (
- "io"
-
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/memory"
)
-const (
- errNotArrowFile = errString("arrow/ipc: not an Arrow file")
- errInconsistentFileMetadata = errString("arrow/ipc: file is smaller than indicated metadata size")
- errInconsistentSchema = errString("arrow/ipc: tried to write record batch with different schema")
- errMaxRecursion = errString("arrow/ipc: max recursion depth reached")
- errBigArray = errString("arrow/ipc: array larger than 2^31-1 in length")
-
- kArrowAlignment = 64 // buffers are padded to 64b boundaries (for SIMD)
- kTensorAlignment = 64 // tensors are padded to 64b boundaries
- kArrowIPCAlignment = 8 // align on 8b boundaries in IPC
-)
-
-var (
- paddingBytes [kArrowAlignment]byte
- kEOS = [4]byte{0, 0, 0, 0} // end of stream message
-)
-
-func paddedLength(nbytes int64, alignment int32) int64 {
- align := int64(alignment)
- return ((nbytes + align - 1) / align) * align
-}
-
-type errString string
-
-func (s errString) Error() string {
- return string(s)
-}
-
-type ReadAtSeeker interface {
- io.Reader
- io.Seeker
- io.ReaderAt
-}
-
type config struct {
alloc memory.Allocator
schema *arrow.Schema
- footer struct {
- offset int64
- }
}
func newConfig(opts ...Option) *config {
@@ -81,13 +42,6 @@ func newConfig(opts ...Option) *config {
// and streams.
type Option func(*config)
-// WithFooterOffset specifies the Arrow footer position in bytes.
-func WithFooterOffset(offset int64) Option {
- return func(cfg *config) {
- cfg.footer.offset = offset
- }
-}
-
// WithAllocator specifies the Arrow memory allocator used while building records.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg *config) {
diff --git a/go/arrow/internal/arrjson/reader.go b/go/arrow/internal/arrjson/reader.go
new file mode 100644
index 0000000..cb95328
--- /dev/null
+++ b/go/arrow/internal/arrjson/reader.go
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+ "encoding/json"
+ "io"
+ "sync/atomic"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/internal/debug"
+)
+
+type Reader struct {
+ refs int64
+
+ schema *arrow.Schema
+ recs []array.Record
+
+ irec int // current record index. used for the arrio.Reader interface.
+}
+
+func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
+ dec := json.NewDecoder(r)
+ dec.UseNumber()
+ var raw struct {
+ Schema Schema `json:"schema"`
+ Records []Record `json:"batches"`
+ }
+ err := dec.Decode(&raw)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg := newConfig()
+ for _, opt := range opts {
+ opt(cfg)
+ }
+
+ schema := schemaFromJSON(raw.Schema)
+ rr := &Reader{
+ refs: 1,
+ schema: schema,
+ recs: recordsFromJSON(cfg.alloc, schema, raw.Records),
+ }
+ return rr, nil
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (r *Reader) Retain() {
+ atomic.AddInt64(&r.refs, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (r *Reader) Release() {
+ debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+
+ if atomic.AddInt64(&r.refs, -1) == 0 {
+ for i, rec := range r.recs {
+ if r.recs[i] != nil {
+ rec.Release()
+ r.recs[i] = nil
+ }
+ }
+ }
+}
+func (r *Reader) Schema() *arrow.Schema { return r.schema }
+func (r *Reader) NumRecords() int { return len(r.recs) }
+
+func (r *Reader) Read() (array.Record, error) {
+ if r.irec == r.NumRecords() {
+ return nil, io.EOF
+ }
+ rec := r.recs[r.irec]
+ r.irec++
+ return rec, nil
+}
+
+var (
+ _ arrio.Reader = (*Reader)(nil)
+)
diff --git a/go/arrow/internal/arrjson/writer.go b/go/arrow/internal/arrjson/writer.go
new file mode 100644
index 0000000..d4f9101
--- /dev/null
+++ b/go/arrow/internal/arrjson/writer.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+ "encoding/json"
+ "io"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+)
+
+const (
+ jsonIndent = " "
+ jsonPrefix = " "
+ jsonRecPrefix = " "
+)
+
+type Writer struct {
+ w io.Writer
+
+ schema *arrow.Schema
+ nrecs int64
+}
+
+func NewWriter(w io.Writer, schema *arrow.Schema) (*Writer, error) {
+ ww := &Writer{
+ w: w,
+ schema: schema,
+ }
+ _, err := ww.w.Write([]byte("{\n"))
+ if err != nil {
+ return nil, err
+ }
+
+ err = ww.writeSchema()
+ if err != nil {
+ return nil, err
+ }
+ return ww, nil
+}
+
+func (w *Writer) Write(rec array.Record) error {
+ switch {
+ case w.nrecs == 0:
+ _, err := w.w.Write([]byte(",\n" + jsonPrefix + `"batches": [` + "\n" + jsonRecPrefix))
+ if err != nil {
+ return err
+ }
+ case w.nrecs > 0:
+ _, err := w.w.Write([]byte(",\n"))
+ if err != nil {
+ return err
+ }
+ }
+
+ raw, err := json.MarshalIndent(recordToJSON(rec), jsonRecPrefix, jsonIndent)
+ if err != nil {
+ return err
+ }
+
+ _, err = w.w.Write(raw)
+ if err != nil {
+ return err
+ }
+
+ w.nrecs++
+ return nil
+}
+
+func (w *Writer) writeSchema() error {
+ _, err := w.w.Write([]byte(` "schema": `))
+ if err != nil {
+ return err
+ }
+ raw, err := json.MarshalIndent(schemaToJSON(w.schema), jsonPrefix, jsonIndent)
+ if err != nil {
+ return err
+ }
+ _, err = w.w.Write(raw)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (w *Writer) Close() error {
+ _, err := w.w.Write([]byte("\n ]\n}"))
+ return err
+}
+
+var (
+ _ arrio.Writer = (*Writer)(nil)
+)
diff --git a/go/arrow/ipc/cmd/arrow-cat/main.go b/go/arrow/ipc/cmd/arrow-cat/main.go
index e5f75e7..9219b68 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main.go
@@ -86,9 +86,7 @@ func main() {
}
func processStream(w io.Writer, rin io.Reader) error {
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(nil, 0)
-
+ mem := memory.NewGoAllocator()
for {
r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
if err != nil {
@@ -142,8 +140,7 @@ func processFile(w io.Writer, fname string) error {
return processStream(w, f)
}
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(nil, 0)
+ mem := memory.NewGoAllocator()
r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
if err != nil {
diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go
index 8f8e382..65a3691 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go
@@ -126,27 +126,36 @@ record 3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
- col[5] "timestamp": [0 (null) (null) 3 4]
- col[6] "date32s": [-2 (null) (null) 1 2]
- col[7] "date64s": [-2 (null) (null) 1 2]
+ col[5] "timestamp_s": [0 (null) (null) 3 4]
+ col[6] "timestamp_ms": [0 (null) (null) 3 4]
+ col[7] "timestamp_us": [0 (null) (null) 3 4]
+ col[8] "timestamp_ns": [0 (null) (null) 3 4]
+ col[9] "date32s": [-2 (null) (null) 1 2]
+ col[10] "date64s": [-2 (null) (null) 1 2]
record 2...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
- col[5] "timestamp": [10 (null) (null) 13 14]
- col[6] "date32s": [-12 (null) (null) 11 12]
- col[7] "date64s": [-12 (null) (null) 11 12]
+ col[5] "timestamp_s": [10 (null) (null) 13 14]
+ col[6] "timestamp_ms": [10 (null) (null) 13 14]
+ col[7] "timestamp_us": [10 (null) (null) 13 14]
+ col[8] "timestamp_ns": [10 (null) (null) 13 14]
+ col[9] "date32s": [-12 (null) (null) 11 12]
+ col[10] "date64s": [-12 (null) (null) 11 12]
record 3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
- col[5] "timestamp": [20 (null) (null) 23 24]
- col[6] "date32s": [-22 (null) (null) 21 22]
- col[7] "date64s": [-22 (null) (null) 21 22]
+ col[5] "timestamp_s": [20 (null) (null) 23 24]
+ col[6] "timestamp_ms": [20 (null) (null) 23 24]
+ col[7] "timestamp_us": [20 (null) (null) 23 24]
+ col[8] "timestamp_ns": [20 (null) (null) 23 24]
+ col[9] "date32s": [-22 (null) (null) 21 22]
+ col[10] "date64s": [-22 (null) (null) 21 22]
`,
},
{
@@ -405,27 +414,36 @@ record 3/3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
- col[5] "timestamp": [0 (null) (null) 3 4]
- col[6] "date32s": [-2 (null) (null) 1 2]
- col[7] "date64s": [-2 (null) (null) 1 2]
+ col[5] "timestamp_s": [0 (null) (null) 3 4]
+ col[6] "timestamp_ms": [0 (null) (null) 3 4]
+ col[7] "timestamp_us": [0 (null) (null) 3 4]
+ col[8] "timestamp_ns": [0 (null) (null) 3 4]
+ col[9] "date32s": [-2 (null) (null) 1 2]
+ col[10] "date64s": [-2 (null) (null) 1 2]
record 2...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
- col[5] "timestamp": [10 (null) (null) 13 14]
- col[6] "date32s": [-12 (null) (null) 11 12]
- col[7] "date64s": [-12 (null) (null) 11 12]
+ col[5] "timestamp_s": [10 (null) (null) 13 14]
+ col[6] "timestamp_ms": [10 (null) (null) 13 14]
+ col[7] "timestamp_us": [10 (null) (null) 13 14]
+ col[8] "timestamp_ns": [10 (null) (null) 13 14]
+ col[9] "date32s": [-12 (null) (null) 11 12]
+ col[10] "date64s": [-12 (null) (null) 11 12]
record 3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
- col[5] "timestamp": [20 (null) (null) 23 24]
- col[6] "date32s": [-22 (null) (null) 21 22]
- col[7] "date64s": [-22 (null) (null) 21 22]
+ col[5] "timestamp_s": [20 (null) (null) 23 24]
+ col[6] "timestamp_ms": [20 (null) (null) 23 24]
+ col[7] "timestamp_us": [20 (null) (null) 23 24]
+ col[8] "timestamp_ns": [20 (null) (null) 23 24]
+ col[9] "date32s": [-22 (null) (null) 21 22]
+ col[10] "date64s": [-22 (null) (null) 21 22]
`,
},
{
@@ -437,27 +455,36 @@ record 1/3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
- col[5] "timestamp": [0 (null) (null) 3 4]
- col[6] "date32s": [-2 (null) (null) 1 2]
- col[7] "date64s": [-2 (null) (null) 1 2]
+ col[5] "timestamp_s": [0 (null) (null) 3 4]
+ col[6] "timestamp_ms": [0 (null) (null) 3 4]
+ col[7] "timestamp_us": [0 (null) (null) 3 4]
+ col[8] "timestamp_ns": [0 (null) (null) 3 4]
+ col[9] "date32s": [-2 (null) (null) 1 2]
+ col[10] "date64s": [-2 (null) (null) 1 2]
record 2/3...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
- col[5] "timestamp": [10 (null) (null) 13 14]
- col[6] "date32s": [-12 (null) (null) 11 12]
- col[7] "date64s": [-12 (null) (null) 11 12]
+ col[5] "timestamp_s": [10 (null) (null) 13 14]
+ col[6] "timestamp_ms": [10 (null) (null) 13 14]
+ col[7] "timestamp_us": [10 (null) (null) 13 14]
+ col[8] "timestamp_ns": [10 (null) (null) 13 14]
+ col[9] "date32s": [-12 (null) (null) 11 12]
+ col[10] "date64s": [-12 (null) (null) 11 12]
record 3/3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
- col[5] "timestamp": [20 (null) (null) 23 24]
- col[6] "date32s": [-22 (null) (null) 21 22]
- col[7] "date64s": [-22 (null) (null) 21 22]
+ col[5] "timestamp_s": [20 (null) (null) 23 24]
+ col[6] "timestamp_ms": [20 (null) (null) 23 24]
+ col[7] "timestamp_us": [20 (null) (null) 23 24]
+ col[8] "timestamp_ns": [20 (null) (null) 23 24]
+ col[9] "date32s": [-22 (null) (null) 21 22]
+ col[10] "date64s": [-22 (null) (null) 21 22]
`,
},
{
diff --git a/go/arrow/ipc/cmd/arrow-file-to-stream/main.go b/go/arrow/ipc/cmd/arrow-file-to-stream/main.go
new file mode 100644
index 0000000..09aadac
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-file-to-stream/main.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-file-to-stream"
+
+import (
+ "flag"
+ "io"
+ "log"
+ "os"
+
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/pkg/errors"
+)
+
+func main() {
+ log.SetPrefix("arrow-file-to-stream: ")
+ log.SetFlags(0)
+
+ flag.Parse()
+
+ if flag.NArg() != 1 {
+ flag.Usage()
+ log.Fatalf("missing path to input ARROW file")
+ }
+
+ err := processFile(os.Stdout, flag.Arg(0))
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func processFile(w io.Writer, fname string) error {
+ r, err := os.Open(fname)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer r.Close()
+
+ mem := memory.NewGoAllocator()
+
+ rr, err := ipc.NewFileReader(r, ipc.WithAllocator(mem))
+ if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
+ return err
+ }
+ defer rr.Close()
+
+ ww := ipc.NewWriter(w, ipc.WithAllocator(mem), ipc.WithSchema(rr.Schema()))
+ defer ww.Close()
+
+ n, err := arrio.Copy(ww, rr)
+ if err != nil {
+ return errors.Wrap(err, "could not copy ARROW stream")
+ }
+ if got, want := n, int64(rr.NumRecords()); got != want {
+ return errors.Errorf("invalid number of records written (got=%d, want=%d)", got, want)
+ }
+
+ err = ww.Close()
+ if err != nil {
+ return errors.Wrap(err, "could not close output ARROW stream")
+ }
+
+ return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
similarity index 57%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
index 64ea6cc..e32bf48 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-file-to-stream"
import (
"io"
@@ -23,11 +23,10 @@ import (
"testing"
"github.com/apache/arrow/go/arrow/internal/arrdata"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
-func TestStream(t *testing.T) {
+func TestFileToStream(t *testing.T) {
for name, recs := range arrdata.Records {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -40,53 +39,30 @@ func TestStream(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())
- {
- w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- defer w.Close()
+ arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
- for i, rec := range recs {
- err = w.Write(rec)
- if err != nil {
- t.Fatalf("could not write record[%d]: %v", i, err)
- }
- }
-
- err = w.Close()
- if err != nil {
- t.Fatal(err)
- }
+ o, err := ioutil.TempFile("", "arrow-ipc-")
+ if err != nil {
+ t.Fatal(err)
}
+ defer os.Remove(o.Name())
- err = f.Sync()
+ err = processFile(o, f.Name())
if err != nil {
- t.Fatalf("could not sync data to disk: %v", err)
+ t.Fatal(err)
}
- _, err = f.Seek(0, io.SeekStart)
+ err = o.Sync()
if err != nil {
- t.Fatalf("could not seek to start: %v", err)
+ t.Fatal(err)
}
- {
- r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer r.Release()
-
- n := 0
- for r.Next() {
- rec := r.Record()
- if !cmpRecs(rec, recs[n]) {
- t.Fatalf("records[%d] differ", n)
- }
- n++
- }
-
- if len(recs) != n {
- t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
- }
+ _, err = o.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
}
+
+ arrdata.CheckArrowStream(t, o, mem, recs[0].Schema(), recs)
})
}
}
diff --git a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
new file mode 100644
index 0000000..fe6d668
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-json-integration-test"
+
+import (
+ "flag"
+ "log"
+ "os"
+
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/internal/arrjson"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/pkg/errors"
+)
+
+func main() {
+ log.SetPrefix("arrow-json: ")
+ log.SetFlags(0)
+
+ var (
+ arrowPath = flag.String("arrow", "", "path to ARROW file")
+ jsonPath = flag.String("json", "", "path to JSON file")
+ mode = flag.String("mode", "VALIDATE", "mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)")
+ verbose = flag.Bool("verbose", true, "enable/disable verbose mode")
+ )
+
+ flag.Parse()
+
+ err := runCommand(*jsonPath, *arrowPath, *mode, *verbose)
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func runCommand(jsonName, arrowName, mode string, verbose bool) error {
+ if jsonName == "" {
+ return errors.Errorf("must specify json file name")
+ }
+
+ if arrowName == "" {
+ return errors.Errorf("must specify arrow file name")
+ }
+
+ switch mode {
+ case "ARROW_TO_JSON":
+ return cnvToJSON(arrowName, jsonName, verbose)
+ case "JSON_TO_ARROW":
+ return cnvToARROW(arrowName, jsonName, verbose)
+ case "VALIDATE":
+ return validate(arrowName, jsonName, verbose)
+ default:
+ return errors.Errorf("unknown command %q", mode)
+ }
+
+ return nil
+}
+
+func cnvToJSON(arrowName, jsonName string, verbose bool) error {
+ r, err := os.Open(arrowName)
+ if err != nil {
+ return errors.Wrapf(err, "could not open ARROW file %q", arrowName)
+ }
+ defer r.Close()
+
+ w, err := os.Create(jsonName)
+ if err != nil {
+ return errors.Wrapf(err, "could not create JSON file %q", jsonName)
+ }
+ defer w.Close()
+
+ rr, err := ipc.NewFileReader(r)
+ if err != nil {
+ return errors.Wrapf(err, "could not open ARROW file reader from file %q", arrowName)
+ }
+ defer rr.Close()
+
+ if verbose {
+ log.Printf("found schema:\n%v\n", rr.Schema())
+ }
+
+ ww, err := arrjson.NewWriter(w, rr.Schema())
+ if err != nil {
+ return errors.Wrap(err, "could not create JSON encoder")
+ }
+ defer ww.Close()
+
+ n, err := arrio.Copy(ww, rr)
+ if err != nil {
+ return errors.Wrap(err, "could not convert ARROW file reader data to JSON data")
+ }
+
+ if got, want := n, int64(rr.NumRecords()); got != want {
+ return errors.Errorf("invalid number of records copied (got=%d, want=%d", got, want)
+ }
+
+ err = ww.Close()
+ if err != nil {
+ return errors.Wrapf(err, "could not close JSON encoder %q", jsonName)
+ }
+
+ err = w.Close()
+ if err != nil {
+ return errors.Wrapf(err, "could not close JSON file %q", jsonName)
+ }
+
+ return nil
+}
+
+func cnvToARROW(arrowName, jsonName string, verbose bool) error {
+ r, err := os.Open(jsonName)
+ if err != nil {
+ return errors.Wrapf(err, "could not open JSON file %q", jsonName)
+ }
+ defer r.Close()
+
+ w, err := os.Create(arrowName)
+ if err != nil {
+ return errors.Wrapf(err, "could not create ARROW file %q", arrowName)
+ }
+ defer w.Close()
+
+ rr, err := arrjson.NewReader(r)
+ if err != nil {
+ return errors.Wrapf(err, "could not open JSON file reader from file %q", jsonName)
+ }
+
+ if verbose {
+ log.Printf("found schema:\n%v\n", rr.Schema())
+ }
+
+ ww, err := ipc.NewFileWriter(w, ipc.WithSchema(rr.Schema()))
+ if err != nil {
+ return errors.Wrap(err, "could not create ARROW file writer")
+ }
+ defer ww.Close()
+
+ n, err := arrio.Copy(ww, rr)
+ if err != nil {
+ return errors.Wrap(err, "could not convert JSON data to ARROW data")
+ }
+
+ if got, want := n, int64(rr.NumRecords()); got != want {
+ return errors.Errorf("invalid number of records copied (got=%d, want=%d", got, want)
+ }
+
+ err = ww.Close()
+ if err != nil {
+ return errors.Wrapf(err, "could not close ARROW file writer %q", arrowName)
+ }
+
+ err = w.Close()
+ if err != nil {
+ return errors.Wrapf(err, "could not close ARROW file %q", arrowName)
+ }
+
+ return nil
+}
+
+func validate(arrowName, jsonName string, verbose bool) error {
+ jr, err := os.Open(jsonName)
+ if err != nil {
+ return errors.Wrapf(err, "could not open JSON file %q", jsonName)
+ }
+ defer jr.Close()
+
+ jrr, err := arrjson.NewReader(jr)
+ if err != nil {
+ return errors.Wrapf(err, "could not open JSON file reader from file %q", jsonName)
+ }
+
+ ar, err := os.Open(arrowName)
+ if err != nil {
+ return errors.Wrapf(err, "could not open ARROW file %q", arrowName)
+ }
+ defer ar.Close()
+
+ arr, err := ipc.NewFileReader(ar)
+ if err != nil {
+ return errors.Wrapf(err, "could not open ARROW file reader from file %q", arrowName)
+ }
+ defer arr.Close()
+
+ if !arr.Schema().Equal(jrr.Schema()) {
+ if verbose {
+ log.Printf("JSON schema:\n%v\nArrow schema:\n%v\n", arr.Schema(), jrr.Schema())
+ }
+ return errors.Errorf("schemas did not match")
+ }
+
+ for i := 0; i < arr.NumRecords(); i++ {
+ arec, err := arr.Read()
+ if err != nil {
+ return errors.Wrapf(err, "could not read record %d from ARROW file", i)
+ }
+ jrec, err := jrr.Read()
+ if err != nil {
+ return errors.Wrapf(err, "could not read record %d from JSON file", i)
+ }
+ if !array.RecordApproxEqual(jrec, arec) {
+ return errors.Errorf("record batch %d did not match\nJSON:\n%v\nARROW:\n%v\n",
+ i, jrec, arec,
+ )
+ }
+ }
+
+ if jn, an := jrr.NumRecords(), arr.NumRecords(); jn != an {
+ return errors.Errorf("different number of record batches: %d (JSON) vs %d (Arrow)", jn, an)
+ }
+
+ return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
similarity index 51%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
index 64ea6cc..a7604a8 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
@@ -14,79 +14,75 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-json-integration-test"
import (
- "io"
"io/ioutil"
"os"
"testing"
"github.com/apache/arrow/go/arrow/internal/arrdata"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
-func TestStream(t *testing.T) {
+func TestIntegration(t *testing.T) {
+ const verbose = true
for name, recs := range arrdata.Records {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
- f, err := ioutil.TempFile("", "arrow-ipc-")
+ af1, err := ioutil.TempFile("", "arrow-json-integration-")
if err != nil {
t.Fatal(err)
}
- defer f.Close()
- defer os.Remove(f.Name())
+ defer af1.Close()
+ defer os.RemoveAll(af1.Name())
- {
- w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- defer w.Close()
+ arrdata.WriteFile(t, af1, mem, recs[0].Schema(), recs)
+ arrdata.CheckArrowFile(t, af1, mem, recs[0].Schema(), recs)
- for i, rec := range recs {
- err = w.Write(rec)
- if err != nil {
- t.Fatalf("could not write record[%d]: %v", i, err)
- }
- }
+ aj, err := ioutil.TempFile("", "arrow-json-integration-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer aj.Close()
+ defer os.RemoveAll(aj.Name())
- err = w.Close()
- if err != nil {
- t.Fatal(err)
- }
+ err = cnvToJSON(af1.Name(), aj.Name(), verbose)
+ if err != nil {
+ t.Fatal(err)
}
- err = f.Sync()
+ err = validate(af1.Name(), aj.Name(), verbose)
if err != nil {
- t.Fatalf("could not sync data to disk: %v", err)
+ t.Fatal(err)
}
- _, err = f.Seek(0, io.SeekStart)
+ af2, err := ioutil.TempFile("", "arrow-json-integration-")
if err != nil {
- t.Fatalf("could not seek to start: %v", err)
+ t.Fatal(err)
}
+ af2.Close()
+ os.RemoveAll(af2.Name())
- {
- r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer r.Release()
+ err = cnvToARROW(af2.Name(), aj.Name(), verbose)
+ if err != nil {
+ t.Fatal(err)
+ }
- n := 0
- for r.Next() {
- rec := r.Record()
- if !cmpRecs(rec, recs[n]) {
- t.Fatalf("records[%d] differ", n)
- }
- n++
- }
+ err = validate(af2.Name(), aj.Name(), verbose)
+ if err != nil {
+ t.Fatal(err)
+ }
- if len(recs) != n {
- t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
- }
+ af2, err = os.Open(af2.Name())
+ if err != nil {
+ t.Fatal(err)
}
+ defer af2.Close()
+
+ arrdata.CheckArrowFile(t, af2, mem, recs[0].Schema(), recs)
})
}
}
diff --git a/go/arrow/ipc/cmd/arrow-ls/main.go b/go/arrow/ipc/cmd/arrow-ls/main.go
index 43460f9..a0f235f 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main.go
@@ -84,8 +84,7 @@ func main() {
}
func processStream(w io.Writer, rin io.Reader) error {
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(nil, 0)
+ mem := memory.NewGoAllocator()
for {
r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
@@ -138,8 +137,7 @@ func processFile(w io.Writer, fname string) error {
return processStream(w, f)
}
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(nil, 0)
+ mem := memory.NewGoAllocator()
r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
if err != nil {
diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go
index 0488eae..71db468 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go
@@ -90,13 +90,16 @@ records: 3
{
name: "fixed_width_types",
want: `schema:
- fields: 8
+ fields: 11
- float16s: type=float16, nullable
- time32ms: type=time32[ms], nullable
- time32s: type=time32[s], nullable
- time64ns: type=time64[ns], nullable
- time64us: type=time64[us], nullable
- - timestamp: type=timestamp[ns], nullable
+ - timestamp_s: type=timestamp[s, tz=UTC], nullable
+ - timestamp_ms: type=timestamp[ms, tz=UTC], nullable
+ - timestamp_us: type=timestamp[us, tz=UTC], nullable
+ - timestamp_ns: type=timestamp[ns, tz=UTC], nullable
- date32s: type=date32, nullable
- date64s: type=date64, nullable
records: 3
diff --git a/go/arrow/ipc/cmd/arrow-stream-to-file/main.go b/go/arrow/ipc/cmd/arrow-stream-to-file/main.go
new file mode 100644
index 0000000..3dc1ec6
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-stream-to-file/main.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-stream-to-file"
+
+import (
+ "flag"
+ "io"
+ "log"
+ "os"
+
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/pkg/errors"
+)
+
+func main() {
+ log.SetPrefix("arrow-stream-to-file: ")
+ log.SetFlags(0)
+
+ flag.Parse()
+
+ err := processStream(os.Stdout, os.Stdin)
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func processStream(w *os.File, r io.Reader) error {
+ mem := memory.NewGoAllocator()
+
+ rr, err := ipc.NewReader(r, ipc.WithAllocator(mem))
+ if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
+ return err
+ }
+
+ ww, err := ipc.NewFileWriter(w, ipc.WithAllocator(mem), ipc.WithSchema(rr.Schema()))
+ if err != nil {
+ return errors.Wrap(err, "could not create ARROW file writer")
+ }
+ defer ww.Close()
+
+ _, err = arrio.Copy(ww, rr)
+ if err != nil {
+ return errors.Wrap(err, "could not copy ARROW stream")
+ }
+
+ err = ww.Close()
+ if err != nil {
+ return errors.Wrap(err, "could not close output ARROW file")
+ }
+
+ return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
similarity index 59%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
index 64ea6cc..fc7d995 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-stream-to-file"
import (
"io"
@@ -23,11 +23,10 @@ import (
"testing"
"github.com/apache/arrow/go/arrow/internal/arrdata"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
-func TestStream(t *testing.T) {
+func TestStreamToFile(t *testing.T) {
for name, recs := range arrdata.Records {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -40,53 +39,40 @@ func TestStream(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())
- {
- w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- defer w.Close()
-
- for i, rec := range recs {
- err = w.Write(rec)
- if err != nil {
- t.Fatalf("could not write record[%d]: %v", i, err)
- }
- }
-
- err = w.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
+ arrdata.WriteStream(t, f, mem, recs[0].Schema(), recs)
err = f.Sync()
if err != nil {
- t.Fatalf("could not sync data to disk: %v", err)
+ t.Fatal(err)
}
_, err = f.Seek(0, io.SeekStart)
if err != nil {
- t.Fatalf("could not seek to start: %v", err)
+ t.Fatal(err)
+ }
+
+ o, err := ioutil.TempFile("", "arrow-ipc-")
+ if err != nil {
+ t.Fatal(err)
}
+ defer os.Remove(o.Name())
- {
- r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer r.Release()
+ err = processStream(o, f)
+ if err != nil {
+ t.Fatal(err)
+ }
- n := 0
- for r.Next() {
- rec := r.Record()
- if !cmpRecs(rec, recs[n]) {
- t.Fatalf("records[%d] differ", n)
- }
- n++
- }
+ err = o.Sync()
+ if err != nil {
+ t.Fatal(err)
+ }
- if len(recs) != n {
- t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
- }
+ _, err = o.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
}
+
+ arrdata.CheckArrowFile(t, o, mem, recs[0].Schema(), recs)
})
}
}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 9f12384..5de7efb 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -45,6 +45,9 @@ type FileReader struct {
schema *arrow.Schema
record array.Record
+
+ irec int // current record index. used for the arrio.Reader interface
+ err error // last error
}
// NewFileReader opens an Arrow file using the provided reader r.
@@ -277,6 +280,25 @@ func (f *FileReader) Record(i int) (array.Record, error) {
return f.record, nil
}
+// Read reads the current record from the underlying stream and an error, if any.
+// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+//
+// The returned record value is valid until the next call to Read.
+// Users need to call Retain on that Record to keep it valid for longer.
+func (f *FileReader) Read() (rec array.Record, err error) {
+ if f.irec == f.NumRecords() {
+ return nil, io.EOF
+ }
+ rec, f.err = f.Record(f.irec)
+ f.irec++
+ return rec, f.err
+}
+
+// ReadAt reads the i-th record from the underlying stream and an error, if any.
+func (f *FileReader) ReadAt(i int64) (array.Record, error) {
+ return f.Record(int(i))
+}
+
func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
@@ -475,8 +497,7 @@ func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
}
func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
- field, buffers := ctx.loadCommon(2)
- buffers = append(buffers, ctx.buffer())
+ field, buffers := ctx.loadCommon(1)
sub := ctx.loadChild(dt.Elem())
defer sub.Release()
diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go
index 136b6a7..98e1bde 100644
--- a/go/arrow/ipc/file_test.go
+++ b/go/arrow/ipc/file_test.go
@@ -17,13 +17,11 @@
package ipc_test
import (
- "io"
"io/ioutil"
"os"
"testing"
"github.com/apache/arrow/go/arrow/internal/arrdata"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
@@ -40,62 +38,8 @@ func TestFile(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())
- {
- w, err := ipc.NewFileWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer w.Close()
-
- for i, rec := range recs {
- err = w.Write(rec)
- if err != nil {
- t.Fatalf("could not write record[%d]: %v", i, err)
- }
- }
-
- err = w.Close()
- if err != nil {
- t.Fatal(err)
- }
-
- err = f.Sync()
- if err != nil {
- t.Fatalf("could not sync data to disk: %v", err)
- }
-
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- t.Fatalf("could not seek to start: %v", err)
- }
- }
-
- {
- r, err := ipc.NewFileReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer r.Close()
-
- if got, want := r.NumRecords(), len(recs); got != want {
- t.Fatalf("invalid number of records. got=%d, want=%d", got, want)
- }
-
- for i := 0; i < r.NumRecords(); i++ {
- rec, err := r.Record(i)
- if err != nil {
- t.Fatalf("could not read record %d: %v", i, err)
- }
- if !cmpRecs(rec, recs[i]) {
- t.Fatalf("records[%d] differ", i)
- }
- }
-
- err = r.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
+ arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
+ arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(), recs)
})
}
}
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 470baf2..e688974 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -20,6 +20,7 @@ import (
"io"
"github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/arrio"
"github.com/apache/arrow/go/arrow/memory"
)
@@ -101,3 +102,12 @@ func WithSchema(schema *arrow.Schema) Option {
cfg.schema = schema
}
}
+
+var (
+ _ arrio.Reader = (*Reader)(nil)
+ _ arrio.Writer = (*Writer)(nil)
+ _ arrio.Reader = (*FileReader)(nil)
+ _ arrio.Writer = (*FileWriter)(nil)
+
+ _ arrio.ReaderAt = (*FileReader)(nil)
+)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
deleted file mode 100644
index 01be713..0000000
--- a/go/arrow/ipc/ipc_test.go
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package ipc_test
-
-import (
- "fmt"
- "io"
- "strings"
-
- "github.com/apache/arrow/go/arrow/array"
-)
-
-func cmpRecs(r1, r2 array.Record) bool {
- // FIXME(sbinet): impl+use arrow.Record.Equal ?
-
- if !r1.Schema().Equal(r2.Schema()) {
- return false
- }
- if r1.NumCols() != r2.NumCols() {
- return false
- }
- if r1.NumRows() != r2.NumRows() {
- return false
- }
-
- var (
- txt1 = new(strings.Builder)
- txt2 = new(strings.Builder)
- )
-
- printRec(txt1, r1)
- printRec(txt2, r2)
-
- return txt1.String() == txt2.String()
-}
-
-func printRec(w io.Writer, rec array.Record) {
- for i, col := range rec.Columns() {
- fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col)
- }
-}
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 794a643..7b4e3da 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -201,7 +201,8 @@ type fieldVisitor struct {
meta map[string]string
}
-func (fv *fieldVisitor) visit(dt arrow.DataType) {
+func (fv *fieldVisitor) visit(field arrow.Field) {
+ dt := field.Type
switch dt := dt.(type) {
case *arrow.NullType:
fv.dtype = flatbuf.TypeNull
@@ -309,7 +310,10 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
case *arrow.TimestampType:
fv.dtype = flatbuf.TypeTimestamp
unit := unitToFB(dt.Unit)
- tz := fv.b.CreateString(dt.TimeZone)
+ var tz flatbuffers.UOffsetT
+ if dt.TimeZone != "" {
+ tz = fv.b.CreateString(dt.TimeZone)
+ }
flatbuf.TimestampStart(fv.b)
flatbuf.TimestampAddUnit(fv.b, unit)
flatbuf.TimestampAddTimezone(fv.b, tz)
@@ -330,13 +334,13 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
case *arrow.ListType:
fv.dtype = flatbuf.TypeList
- fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem()}, fv.memo))
+ fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
flatbuf.ListStart(fv.b)
fv.offset = flatbuf.ListEnd(fv.b)
case *arrow.FixedSizeListType:
fv.dtype = flatbuf.TypeFixedSizeList
- fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem()}, fv.memo))
+ fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
flatbuf.FixedSizeListStart(fv.b)
flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
fv.offset = flatbuf.FixedSizeListEnd(fv.b)
@@ -369,7 +373,7 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
func (fv *fieldVisitor) result(field arrow.Field) flatbuffers.UOffsetT {
nameFB := fv.b.CreateString(field.Name)
- fv.visit(field.Type)
+ fv.visit(field)
flatbuf.FieldStartChildrenVector(fv.b, len(fv.kids))
for i := len(fv.kids) - 1; i >= 0; i-- {
diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go
index 919c7a5..627e67a 100644
--- a/go/arrow/ipc/reader.go
+++ b/go/arrow/ipc/reader.go
@@ -179,6 +179,24 @@ func (r *Reader) Record() array.Record {
return r.rec
}
+// Read reads the current record from the underlying stream and an error, if any.
+// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+func (r *Reader) Read() (array.Record, error) {
+ if r.rec != nil {
+ r.rec.Release()
+ r.rec = nil
+ }
+
+ if !r.next() {
+ if r.done {
+ return nil, io.EOF
+ }
+ return nil, r.err
+ }
+
+ return r.rec, nil
+}
+
var (
_ array.RecordReader = (*Reader)(nil)
)
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/stream_test.go
index 64ea6cc..65d2c72 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/stream_test.go
@@ -23,7 +23,6 @@ import (
"testing"
"github.com/apache/arrow/go/arrow/internal/arrdata"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
@@ -40,22 +39,7 @@ func TestStream(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())
- {
- w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- defer w.Close()
-
- for i, rec := range recs {
- err = w.Write(rec)
- if err != nil {
- t.Fatalf("could not write record[%d]: %v", i, err)
- }
- }
-
- err = w.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
+ arrdata.WriteStream(t, f, mem, recs[0].Schema(), recs)
err = f.Sync()
if err != nil {
@@ -67,26 +51,7 @@ func TestStream(t *testing.T) {
t.Fatalf("could not seek to start: %v", err)
}
- {
- r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
- if err != nil {
- t.Fatal(err)
- }
- defer r.Release()
-
- n := 0
- for r.Next() {
- rec := r.Record()
- if !cmpRecs(rec, recs[n]) {
- t.Fatalf("records[%d] differ", n)
- }
- n++
- }
-
- if len(recs) != n {
- t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
- }
- }
+ arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs)
})
}
}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e526b65..3561598 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -76,6 +76,13 @@ func NewWriter(w io.Writer, opts ...Option) *Writer {
}
func (w *Writer) Close() error {
+ if !w.started {
+ err := w.start()
+ if err != nil {
+ return err
+ }
+ }
+
if w.pw == nil {
return nil
}
@@ -272,7 +279,14 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
switch {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
- panic("not implemented") // FIXME(sbinet) writer.cc:264
+ // slice data buffer to include the range we need now.
+ var (
+ beg = int64(arr.ValueOffset(0))
+ len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(data.Len())-beg)
+ )
+ data = array.NewSliceData(data, beg, beg+len)
+ defer data.Release()
+ values = data.Buffers()[2]
default:
if values != nil {
values.Retain()
@@ -297,7 +311,14 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
switch {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
- panic("not implemented") // FIXME(sbinet) writer.cc:264
+ // slice data buffer to include the range we need now.
+ var (
+ beg = int64(arr.ValueOffset(0))
+ len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(data.Len())-beg)
+ )
+ data = array.NewSliceData(data, beg, beg+len)
+ defer data.Release()
+ values = data.Buffers()[2]
default:
if values != nil {
values.Retain()
@@ -357,36 +378,17 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
case *arrow.FixedSizeListType:
arr := arr.(*array.FixedSizeList)
- voffsets, err := w.getZeroBasedValueOffsets(arr)
- if err != nil {
- return errors.Wrapf(err, "could not retrieve zero-based value offsets for array %T", arr)
- }
- p.body = append(p.body, voffsets)
w.depth--
- var (
- values = arr.ListValues()
- mustRelease = false
- values_offset int64
- values_length int64
- )
- defer func() {
- if mustRelease {
- values.Release()
- }
- }()
- if voffsets != nil {
- values_offset = int64(arr.Offsets()[0])
- values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
- }
+ size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
+ beg := int64(arr.Offset()) * size
+ end := int64(arr.Offset()+arr.Len()) * size
- if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
- // must also slice the values
- values = array.NewSlice(values, values_offset, values_length)
- mustRelease = true
- }
- err = w.visit(p, values)
+ values := array.NewSlice(arr.ListValues(), beg, end)
+ defer values.Release()
+
+ err := w.visit(p, values)
if err != nil {
return errors.Wrapf(err, "could not visit list element for array %T", arr)
@@ -407,6 +409,9 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr array.Interface) (*memory.B
// FIXME(sbinet): writer.cc:231
panic(fmt.Errorf("not implemented offset=%d", data.Offset()))
}
+ if voffsets == nil || voffsets.Len() == 0 {
+ return nil, nil
+ }
voffsets.Retain()
return voffsets, nil
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 7b8e562..a4763c9 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -1234,6 +1234,24 @@ class IntegrationRunner(object):
' for JS once JS supports them')
continue
+ if ('Go' in (producer.name, consumer.name) and
+ "decimal" in test_case.name):
+ print('TODO(ARROW-3676): Enable decimal tests ' +
+ ' for Go')
+ continue
+
+ if ('Go' in (producer.name, consumer.name) and
+ "map" in test_case.name):
+ print('TODO(ARROW-3679): Enable map tests ' +
+ ' for Go')
+ continue
+
+ if ('Go' in (producer.name, consumer.name) and
+ "dictionary" in test_case.name):
+ print('TODO(ARROW-3039): Enable dictionary tests ' +
+ ' for Go')
+ continue
+
# Make the random access file
producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.json_as_file')
@@ -1586,6 +1604,57 @@ class JSTester(Tester):
os.system(cmd)
+class GoTester(Tester):
+ PRODUCER = True
+ CONSUMER = True
+
+ # FIXME(sbinet): revisit for Go modules
+ GOPATH = os.getenv('GOPATH', '~/go')
+ GOBIN = os.environ.get('GOBIN', os.path.join(GOPATH, 'bin'))
+
+ GO_INTEGRATION_EXE = os.path.join(GOBIN, 'arrow-json-integration-test')
+ STREAM_TO_FILE = os.path.join(GOBIN, 'arrow-stream-to-file')
+ FILE_TO_STREAM = os.path.join(GOBIN, 'arrow-file-to-stream')
+
+ name = 'Go'
+
+ def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
+ cmd = [self.GO_INTEGRATION_EXE]
+
+ if arrow_path is not None:
+ cmd.extend(['-arrow', arrow_path])
+
+ if json_path is not None:
+ cmd.extend(['-json', json_path])
+
+ cmd.extend(['-mode', command])
+
+ if self.debug:
+ print(' '.join(cmd))
+
+ run_cmd(cmd)
+
+ def validate(self, json_path, arrow_path):
+ return self._run(arrow_path, json_path, 'VALIDATE')
+
+ def json_to_file(self, json_path, arrow_path):
+ return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+
+ def stream_to_file(self, stream_path, file_path):
+ cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
+ cmd = ' '.join(cmd)
+ if self.debug:
+ print(cmd)
+ os.system(cmd)
+
+ def file_to_stream(self, file_path, stream_path):
+ cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
+ cmd = ' '.join(cmd)
+ if self.debug:
+ print(cmd)
+ os.system(cmd)
+
+
def get_static_json_files():
glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
return [JsonFile(name=os.path.basename(p), path=p, skip=set(),
@@ -1605,6 +1674,9 @@ def run_all_tests(args):
if args.enable_js:
testers.append(JSTester(args))
+ if args.enable_go:
+ testers.append(GoTester(args))
+
static_json_files = get_static_json_files()
generated_json_files = get_generated_json_files(tempdir=args.tempdir,
flight=args.run_flight)
@@ -1661,6 +1733,9 @@ if __name__ == '__main__':
parser.add_argument('--enable-js', dest='enable_js',
action='store', type=int, default=1,
help='Include JavaScript in integration tests')
+ parser.add_argument('--enable-go', dest='enable_go',
+ action='store', type=int, default=1,
+ help='Include Go in integration tests')
parser.add_argument('--write_generated_json', dest='generated_json_path',
action='store', default=False,