You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/22 20:03:52 UTC

[arrow] branch master updated: ARROW-5493: [Go][Integration] add Go support for IPC integration tests

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba2763  ARROW-5493: [Go][Integration] add Go support for IPC integration tests
4ba2763 is described below

commit 4ba2763150459c9eb4139e5954d9b5526b8ef0ee
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sat Jun 22 15:03:45 2019 -0500

    ARROW-5493: [Go][Integration] add Go support for IPC integration tests
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4565 from sbinet/issue-5493 and squashes the following commits:
    
    0385a98b5 <Sebastien Binet> go/arrow/ipc/cmd/arrow-stream-to-file: cleanup
    e2d5a5018 <Sebastien Binet> go/arrow/internal/arrdata: remove spurious os.File.Sync on r/o file
    1823cec7d <Sebastien Binet> ci: run integration tests as part of Go changes
    fc31f3e65 <Sebastien Binet> go/arrow/ipc/cmd/arrow-json-integration-test: add unit test similar to test_integration.py
    d0f14dba6 <Sebastien Binet> go/arrow/{arrio,internal/arrdata}: export {Check,Write}{File,Stream} helper functions
    a0ec358de <Sebastien Binet> go/arrow{ipc,internal/arrjson}: inherit nullable attribute for {FixedSized}List.Item element
    c9132f7b7 <Sebastien Binet> go/arrow/ipc: do not encode Timestamp.TimeZone when empty
    8ba7a7556 <Sebastien Binet> go/arrow: handle FixedSizeList array with no offsets
    10c08bb31 <Sebastien Binet> integration: handle no GOPATH case
    93d499106 <Sebastien Binet> go/arrow: display TimeZone as part of TimestampType Stringer implementation
    b82ed9731 <Sebastien Binet> go/arrow/ipc: handle empty streams
    90d79fdc3 <Sebastien Binet> integration: skip know to fail Go tests
    4bc504d50 <Sebastien Binet> ci: add Go-1.12.x to integration build
    9f2e4662a <Sebastien Binet> integration: linting
    08e51cc2a <Sebastien Binet> ci,integration: adapt for GO111MODULE=on
    73ec80ce3 <Sebastien Binet> dev: add Go tests for release-candidate
    51c6a9ff8 <Sebastien Binet> ci: build Go binaries for integration tests
    c1b8586d4 <Sebastien Binet> go/arrow/{,ipc,internal/arrdata}: add tests for all Timestamps (s,ms,us,ns)
    ae57bac2f <Sebastien Binet> go/arrow/internal/arrjson: implement r/w JSON for Interval and Duration arrays
    e5542d541 <Sebastien Binet> go: introduce arrow/arrio package, mirroring stdlib io package
    ff953f77c <Sebastien Binet> go: use array.RecordEqual
    e8d4de7e1 <Sebastien Binet> go/arrow/internal/arrjson: implement r/w roundtrip
    094e9d00f <Sebastien Binet> go/arrow/internal/arrjson: handle hex-string decoding for FixedSizeBinary
    043d53763 <Sebastien Binet> go/arrow/array: fixed sub-builders of FixedSizeListBuilder ref-count
    b30dc7886 <Sebastien Binet> go/arrow/ipc/cmd/arrow-json-integration-test: first import
    16e13e2ba <Sebastien Binet> go/arrow/internal/arrjson: first import
    de2ce54e9 <Sebastien Binet> go/arrow/ipc/cmd/arrow-{file,stream}-to-{stream,file}: first import
    d3e51f335 <Sebastien Binet> go/arrow/ipc: introduce Record{Reader{,At},Writer} interfaces and Copy{,N} functions
    ee4c500c6 <Sebastien Binet> ipc/cmd/arrow-{cat,ls}: simplify memory allocator
    d9f077b25 <Sebastien Binet> ARROW-5493:  add Go support for IPC integration tests
---
 ci/detect-changes.py                               |    1 +
 ci/travis_env_common.sh                            |    1 +
 ci/travis_script_integration.sh                    |   14 +
 dev/release/verify-release-candidate.sh            |   13 +
 go/arrow/array/array.go                            |    6 +-
 go/arrow/array/array_test.go                       |    2 +-
 go/arrow/array/fixed_size_list.go                  |   63 +-
 go/arrow/array/fixed_size_list_test.go             |   12 +-
 go/arrow/arrio/arrio.go                            |   91 ++
 go/arrow/arrio/arrio_test.go                       |  201 +++
 go/arrow/datatype_fixedwidth.go                    |   27 +-
 go/arrow/example_test.go                           |   29 +-
 go/arrow/go.mod                                    |    2 +
 go/arrow/internal/arrdata/arrdata.go               |   77 +-
 go/arrow/internal/arrdata/ioutil.go                |  143 ++
 go/arrow/internal/arrjson/arrjson.go               | 1464 ++++++++++++++++++++
 go/arrow/internal/arrjson/arrjson_test.go          |  110 ++
 .../{ipc/ipc.go => internal/arrjson/option.go}     |   48 +-
 go/arrow/internal/arrjson/reader.go                |  100 ++
 go/arrow/internal/arrjson/writer.go                |  110 ++
 go/arrow/ipc/cmd/arrow-cat/main.go                 |    7 +-
 go/arrow/ipc/cmd/arrow-cat/main_test.go            |   81 +-
 go/arrow/ipc/cmd/arrow-file-to-stream/main.go      |   83 ++
 .../arrow-file-to-stream/main_test.go}             |   56 +-
 .../ipc/cmd/arrow-json-integration-test/main.go    |  226 +++
 .../arrow-json-integration-test/main_test.go}      |   78 +-
 go/arrow/ipc/cmd/arrow-ls/main.go                  |    6 +-
 go/arrow/ipc/cmd/arrow-ls/main_test.go             |    7 +-
 go/arrow/ipc/cmd/arrow-stream-to-file/main.go      |   71 +
 .../arrow-stream-to-file/main_test.go}             |   62 +-
 go/arrow/ipc/file_reader.go                        |   25 +-
 go/arrow/ipc/file_test.go                          |   60 +-
 go/arrow/ipc/ipc.go                                |   10 +
 go/arrow/ipc/ipc_test.go                           |   55 -
 go/arrow/ipc/metadata.go                           |   14 +-
 go/arrow/ipc/reader.go                             |   18 +
 go/arrow/ipc/stream_test.go                        |   39 +-
 go/arrow/ipc/writer.go                             |   61 +-
 integration/integration_test.py                    |   75 +
 39 files changed, 3055 insertions(+), 493 deletions(-)

diff --git a/ci/detect-changes.py b/ci/detect-changes.py
index 1a30eb5..9194502 100644
--- a/ci/detect-changes.py
+++ b/ci/detect-changes.py
@@ -141,6 +141,7 @@ AFFECTED_DEPENDENCIES = {
     'ci': ALL_TOPICS,
     'cpp': ['python', 'c_glib', 'r', 'ruby', 'integration'],
     'format': LANGUAGE_TOPICS,
+    'go': ['integration'],
     '.travis.yml': ALL_TOPICS,
     'c_glib': ['ruby']
 }
diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh
index 922b37a..453cd94 100755
--- a/ci/travis_env_common.sh
+++ b/ci/travis_env_common.sh
@@ -38,6 +38,7 @@ export ARROW_INTEGRATION_DIR=$TRAVIS_BUILD_DIR/integration
 export ARROW_DEV_DIR=$TRAVIS_BUILD_DIR/dev
 export ARROW_CROSSBOW_DIR=$TRAVIS_BUILD_DIR/dev/tasks
 export ARROW_RUBY_DIR=$TRAVIS_BUILD_DIR/ruby
+export ARROW_GO_DIR=${TRAVIS_BUILD_DIR}/go
 export ARROW_RUST_DIR=${TRAVIS_BUILD_DIR}/rust
 export ARROW_R_DIR=${TRAVIS_BUILD_DIR}/r
 
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index c51005d..c043189 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -40,6 +40,20 @@ npm run build -- -t apache-arrow
 
 popd
 
+pushd $ARROW_GO_DIR/arrow
+
+eval $(gimme 1.12.x)
+
+export GO111MODULE=on
+export GOBIN=`pwd`/bin
+
+which go
+go version
+go env
+go get -v ./...
+
+popd
+
 pushd $ARROW_INTEGRATION_DIR
 
 conda activate $CPP_TOOLCHAIN
diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh
index 80daa42..f14c99d 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -370,6 +370,15 @@ test_rust() {
   popd
 }
 
+test_go() {
+  pushd go
+
+  go get -v ./...
+  go test ./...
+
+  popd
+}
+
 # Build and test Java (Requires newer Maven -- I used 3.3.9)
 
 test_package_java() {
@@ -424,6 +433,7 @@ import_gpg_keys
 : ${TEST_PYTHON:=${TEST_DEFAULT}}
 : ${TEST_JS:=${TEST_DEFAULT}}
 : ${TEST_INTEGRATION:=${TEST_DEFAULT}}
+: ${TEST_GO:=${TEST_DEFAULT}}
 : ${TEST_RUST:=${TEST_DEFAULT}}
 : ${TEST_BINARY:=${TEST_DEFAULT}}
 : ${TEST_APT:=${TEST_DEFAULT}}
@@ -472,6 +482,9 @@ if [ "$ARTIFACT" == "source" ]; then
   if [ ${TEST_INTEGRATION} -gt 0 ]; then
     test_integration
   fi
+  if [ ${TEST_GO} -gt 0 ]; then
+    test_go
+  fi
   if [ ${TEST_RUST} -gt 0 ]; then
     test_rust
   fi
diff --git a/go/arrow/array/array.go b/go/arrow/array/array.go
index d8418f8..c357733 100644
--- a/go/arrow/array/array.go
+++ b/go/arrow/array/array.go
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package array
+package array // import "github.com/apache/arrow/go/arrow/array"
 
 import (
 	"sync/atomic"
@@ -130,6 +130,10 @@ func (a *array) setData(data *Data) {
 	a.data = data
 }
 
+func (a *array) Offset() int {
+	return a.data.Offset()
+}
+
 type arrayConstructorFn func(*Data) Interface
 
 var (
diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index ba3a961..590fa3c 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -79,7 +79,7 @@ func TestMakeFromData(t *testing.T) {
 			array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
 		}},
 
-		{name: "fixed_size_list", d: &testDataType{arrow.FIXED_SIZE_LIST}, child: []*array.Data{
+		{name: "fixed_size_list", d: arrow.FixedSizeListOf(4, arrow.PrimitiveTypes.Int64), child: []*array.Data{
 			array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
 			array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
 		}},
diff --git a/go/arrow/array/fixed_size_list.go b/go/arrow/array/fixed_size_list.go
index 1145e4e..6c61513 100644
--- a/go/arrow/array/fixed_size_list.go
+++ b/go/arrow/array/fixed_size_list.go
@@ -30,9 +30,8 @@ import (
 // FixedSizeList represents an immutable sequence of N array values.
 type FixedSizeList struct {
 	array
-	n       int32
-	values  Interface
-	offsets []int32
+	n      int32
+	values Interface
 }
 
 // NewFixedSizeListData returns a new List array value, from data.
@@ -65,18 +64,17 @@ func (a *FixedSizeList) String() string {
 }
 
 func (a *FixedSizeList) newListValue(i int) Interface {
-	j := i + a.array.data.offset
-	beg := int64(a.offsets[j])
-	end := int64(a.offsets[j+1])
-	return NewSlice(a.values, beg, end)
+	n := int64(a.n)
+	off := int64(a.array.data.offset)
+	beg := (off + int64(i)) * n
+	end := (off + int64(i+1)) * n
+	sli := NewSlice(a.values, beg, end)
+	return sli
 }
 
 func (a *FixedSizeList) setData(data *Data) {
 	a.array.setData(data)
-	vals := data.buffers[1]
-	if vals != nil {
-		a.offsets = arrow.Int32Traits.CastFromBytes(vals.Bytes())
-	}
+	a.n = a.DataType().(*arrow.FixedSizeListType).Len()
 	a.values = MakeFromData(data.childData[0])
 }
 
@@ -102,8 +100,6 @@ func arrayEqualFixedSizeList(left, right *FixedSizeList) bool {
 // Len returns the number of elements in the array.
 func (a *FixedSizeList) Len() int { return a.array.Len() }
 
-func (a *FixedSizeList) Offsets() []int32 { return a.offsets }
-
 func (a *FixedSizeList) Retain() {
 	a.array.Retain()
 	a.values.Retain()
@@ -117,10 +113,9 @@ func (a *FixedSizeList) Release() {
 type FixedSizeListBuilder struct {
 	builder
 
-	etype   arrow.DataType // data type of the list's elements.
-	n       int32          // number of elements in the fixed-size list.
-	values  Builder        // value builder for the list's elements.
-	offsets *Int32Builder
+	etype  arrow.DataType // data type of the list's elements.
+	n      int32          // number of elements in the fixed-size list.
+	values Builder        // value builder for the list's elements.
 }
 
 // NewFixedSizeListBuilder returns a builder, using the provided memory allocator.
@@ -131,7 +126,6 @@ func NewFixedSizeListBuilder(mem memory.Allocator, n int32, etype arrow.DataType
 		etype:   etype,
 		n:       n,
 		values:  newBuilder(mem, etype),
-		offsets: NewInt32Builder(mem),
 	}
 }
 
@@ -145,31 +139,25 @@ func (b *FixedSizeListBuilder) Release() {
 			b.nullBitmap.Release()
 			b.nullBitmap = nil
 		}
+		if b.values != nil {
+			b.values.Release()
+			b.values = nil
+		}
 	}
-
-	b.values.Release()
-	b.offsets.Release()
-}
-
-func (b *FixedSizeListBuilder) appendNextOffset() {
-	b.offsets.Append(int32(b.values.Len()))
 }
 
 func (b *FixedSizeListBuilder) Append(v bool) {
 	b.Reserve(1)
 	b.unsafeAppendBoolToBitmap(v)
-	b.appendNextOffset()
 }
 
 func (b *FixedSizeListBuilder) AppendNull() {
 	b.Reserve(1)
 	b.unsafeAppendBoolToBitmap(false)
-	b.appendNextOffset()
 }
 
-func (b *FixedSizeListBuilder) AppendValues(offsets []int32, valid []bool) {
+func (b *FixedSizeListBuilder) AppendValues(valid []bool) {
 	b.Reserve(len(valid))
-	b.offsets.AppendValues(offsets, nil)
 	b.builder.unsafeAppendBoolsToBitmap(valid, len(valid))
 }
 
@@ -189,7 +177,6 @@ func (b *FixedSizeListBuilder) unsafeAppendBoolToBitmap(isValid bool) {
 
 func (b *FixedSizeListBuilder) init(capacity int) {
 	b.builder.init(capacity)
-	b.offsets.init(capacity + 1)
 }
 
 // Reserve ensures there is enough space for appending n elements
@@ -209,7 +196,6 @@ func (b *FixedSizeListBuilder) Resize(n int) {
 		b.init(n)
 	} else {
 		b.builder.resize(n, b.builder.init)
-		b.offsets.resize(n+1, b.offsets.init)
 	}
 }
 
@@ -226,9 +212,6 @@ func (b *FixedSizeListBuilder) NewArray() Interface {
 // NewListArray creates a List array from the memory buffers used by the builder and resets the FixedSizeListBuilder
 // so it can be used to build a new array.
 func (b *FixedSizeListBuilder) NewListArray() (a *FixedSizeList) {
-	if b.offsets.Len() != b.length+1 {
-		b.appendNextOffset()
-	}
 	data := b.newData()
 	a = NewFixedSizeListData(data)
 	data.Release()
@@ -239,19 +222,9 @@ func (b *FixedSizeListBuilder) newData() (data *Data) {
 	values := b.values.NewArray()
 	defer values.Release()
 
-	var offsets *memory.Buffer
-	if b.offsets != nil {
-		arr := b.offsets.NewInt32Array()
-		defer arr.Release()
-		offsets = arr.Data().buffers[1]
-	}
-
 	data = NewData(
 		arrow.FixedSizeListOf(b.n, b.etype), b.length,
-		[]*memory.Buffer{
-			b.nullBitmap,
-			offsets,
-		},
+		[]*memory.Buffer{b.nullBitmap},
 		[]*Data{values.Data()},
 		b.nulls,
 		0,
diff --git a/go/arrow/array/fixed_size_list_test.go b/go/arrow/array/fixed_size_list_test.go
index afbf9e8..d72bc0b 100644
--- a/go/arrow/array/fixed_size_list_test.go
+++ b/go/arrow/array/fixed_size_list_test.go
@@ -33,7 +33,6 @@ func TestFixedSizeListArray(t *testing.T) {
 		vs      = []int32{0, 1, 2, 3, 4, 5, 6}
 		lengths = []int{3, 0, 4}
 		isValid = []bool{true, false, true}
-		offsets = []int32{0, 3, 3, 7}
 	)
 
 	lb := array.NewFixedSizeListBuilder(pool, int32(len(vs)), arrow.PrimitiveTypes.Int32)
@@ -75,10 +74,6 @@ func TestFixedSizeListArray(t *testing.T) {
 			}
 		}
 
-		if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) {
-			t.Fatalf("got=%v, want=%v", got, want)
-		}
-
 		varr := arr.ListValues().(*array.Int32)
 		if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) {
 			t.Fatalf("got=%v, want=%v", got, want)
@@ -107,7 +102,6 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
 		vs      = []int32{0, 1, 2, 3, 4, 5, 6}
 		lengths = []int{3, 0, 4}
 		isValid = []bool{true, false, true}
-		offsets = []int32{0, 3, 3, 7}
 	)
 
 	lb := array.NewFixedSizeListBuilder(pool, int32(len(vs)), arrow.PrimitiveTypes.Int32)
@@ -115,7 +109,7 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
 	vb := lb.ValueBuilder().(*array.Int32Builder)
 	vb.Reserve(len(vs))
 
-	lb.AppendValues(offsets, isValid)
+	lb.AppendValues(isValid)
 	for _, v := range vs {
 		vb.Append(v)
 	}
@@ -140,10 +134,6 @@ func TestFixedSizeListArrayBulkAppend(t *testing.T) {
 		}
 	}
 
-	if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) {
-		t.Fatalf("got=%v, want=%v", got, want)
-	}
-
 	varr := arr.ListValues().(*array.Int32)
 	if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) {
 		t.Fatalf("got=%v, want=%v", got, want)
diff --git a/go/arrow/arrio/arrio.go b/go/arrow/arrio/arrio.go
new file mode 100644
index 0000000..60cfb78
--- /dev/null
+++ b/go/arrow/arrio/arrio.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package arrio exposes functions to manipulate records, exposing and using
+// interfaces not unlike the ones defined in the stdlib io package.
+package arrio // import "github.com/apache/arrow/go/arrow/arrio"
+
+import (
+	"io"
+
+	"github.com/apache/arrow/go/arrow/array"
+)
+
+// Reader is the interface that wraps the Read method.
+type Reader interface {
+	// Read reads the current record from the underlying stream and an error, if any.
+	// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+	Read() (array.Record, error)
+}
+
+// ReaderAt is the interface that wraps the ReadAt method.
+type ReaderAt interface {
+	// ReadAt reads the i-th record from the underlying stream and an error, if any.
+	ReadAt(i int64) (array.Record, error)
+}
+
+// Writer is the interface that wraps the Write method.
+type Writer interface {
+	Write(rec array.Record) error
+}
+
+// Copy copies all the records available from src to dst.
+// Copy returns the number of records copied and the first error
+// encountered while copying, if any.
+//
+// A successful Copy returns err == nil, not err == EOF. Because Copy is
+// defined to read from src until EOF, it does not treat an EOF from Read as an
+// error to be reported.
+func Copy(dst Writer, src Reader) (n int64, err error) {
+	for {
+		rec, err := src.Read()
+		if err != nil {
+			if err == io.EOF {
+				return n, nil
+			}
+			return n, err
+		}
+		err = dst.Write(rec)
+		if err != nil {
+			return n, err
+		}
+		n++
+	}
+}
+
+// CopyN copies n records (or until an error) from src to dst. It returns the
+// number of records copied and the earliest error encountered while copying. On
+// return, written == n if and only if err == nil.
+func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
+	for ; written < n; written++ {
+		rec, err := src.Read()
+		if err != nil {
+			if err == io.EOF && written == n {
+				return written, nil
+			}
+			return written, err
+		}
+		err = dst.Write(rec)
+		if err != nil {
+			return written, err
+		}
+	}
+
+	if written != n && err == nil {
+		err = io.EOF
+	}
+	return written, err
+}
diff --git a/go/arrow/arrio/arrio_test.go b/go/arrow/arrio/arrio_test.go
new file mode 100644
index 0000000..74f7f73
--- /dev/null
+++ b/go/arrow/arrio/arrio_test.go
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrio_test
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/arrio"
+	"github.com/apache/arrow/go/arrow/internal/arrdata"
+	"github.com/apache/arrow/go/arrow/ipc"
+	"github.com/apache/arrow/go/arrow/memory"
+)
+
+type copyKind int
+
+const (
+	fileKind copyKind = iota
+	streamKind
+)
+
+func (k copyKind) write(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	switch k {
+	case fileKind:
+		arrdata.WriteFile(t, f, mem, schema, recs)
+	case streamKind:
+		arrdata.WriteStream(t, f, mem, schema, recs)
+	default:
+		panic("invalid copyKind")
+	}
+}
+
+func (k copyKind) check(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	switch k {
+	case fileKind:
+		arrdata.CheckArrowFile(t, f, mem, schema, recs)
+	case streamKind:
+		arrdata.CheckArrowStream(t, f, mem, schema, recs)
+	default:
+		panic("invalid copyKind")
+	}
+}
+
+func TestCopy(t *testing.T) {
+	type kind int
+
+	for _, tc := range []struct {
+		name     string
+		src, dst copyKind
+	}{
+		{name: "file2file", src: fileKind, dst: fileKind},
+		{name: "file2stream", src: fileKind, dst: streamKind},
+		{name: "stream2file", src: streamKind, dst: fileKind},
+		{name: "stream2stream", src: streamKind, dst: streamKind},
+	} {
+		t.Run(tc.name, func(t *testing.T) {
+			for name, recs := range arrdata.Records {
+				t.Run(name, func(t *testing.T) {
+					for _, tcopy := range []struct {
+						n    int
+						want int
+						err  error
+					}{
+						{-1, len(recs), nil},
+						{1, 1, nil},
+						{0, 0, nil},
+						{len(recs), len(recs), nil},
+						{len(recs) + 1, len(recs), io.EOF},
+					} {
+						t.Run(fmt.Sprintf("-copy-n=%d", tcopy.n), func(t *testing.T) {
+							mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+							defer mem.AssertSize(t, 0)
+
+							f, err := ioutil.TempFile("", "arrow-ipc-")
+							if err != nil {
+								t.Fatal(err)
+							}
+							defer f.Close()
+							defer os.Remove(f.Name())
+
+							o, err := ioutil.TempFile("", "arrow-ipc-")
+							if err != nil {
+								t.Fatal(err)
+							}
+							defer o.Close()
+							defer os.Remove(o.Name())
+
+							tc.src.write(t, f, mem, recs[0].Schema(), recs)
+							tc.src.check(t, f, mem, recs[0].Schema(), recs)
+
+							_, err = f.Seek(0, io.SeekStart)
+							if err != nil {
+								t.Fatal(err)
+							}
+
+							var r arrio.Reader
+							switch tc.src {
+							case fileKind:
+								rr, err := ipc.NewFileReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+								if err != nil {
+									t.Fatal(err)
+								}
+								defer rr.Close()
+								r = rr
+							case streamKind:
+								rr, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+								if err != nil {
+									t.Fatal(err)
+								}
+								defer rr.Release()
+								r = rr
+							default:
+								t.Fatalf("invalid src type %v", tc.src)
+							}
+
+							var w interface {
+								arrio.Writer
+								io.Closer
+							}
+
+							switch tc.dst {
+							case fileKind:
+								w, err = ipc.NewFileWriter(o, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+								if err != nil {
+									t.Fatal(err)
+								}
+							case streamKind:
+								w = ipc.NewWriter(o, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
+							default:
+								t.Fatalf("invalid dst type %v", tc.dst)
+							}
+							defer w.Close()
+
+							var (
+								n int64
+							)
+							switch tcopy.n {
+							case -1:
+								n, err = arrio.Copy(w, r)
+							case len(recs) + 1:
+								n, err = arrio.CopyN(w, r, int64(tcopy.n))
+							default:
+								n, err = arrio.CopyN(w, r, int64(tcopy.n))
+							}
+
+							switch err {
+							case nil:
+								if tcopy.err != nil {
+									t.Fatalf("got a nil error, want=%v", tcopy.err)
+								}
+							default:
+								switch tcopy.err {
+								case nil:
+									t.Fatalf("invalid error: got=%v, want=%v", err, tcopy.err)
+								default:
+									if tcopy.err.Error() != err.Error() {
+										t.Fatalf("invalid error: got=%v, want=%v", err, tcopy.err)
+									}
+								}
+							}
+
+							if got, want := n, int64(tcopy.want); got != want {
+								t.Fatalf("invalid number of records copied: got=%d, want=%d", got, want)
+							}
+
+							err = w.Close()
+							if err != nil {
+								t.Fatal(err)
+							}
+
+							tc.dst.check(t, o, mem, recs[0].Schema(), recs[:tcopy.want])
+						})
+					}
+				})
+			}
+		})
+	}
+}
diff --git a/go/arrow/datatype_fixedwidth.go b/go/arrow/datatype_fixedwidth.go
index 5f63ea1..79bb878 100644
--- a/go/arrow/datatype_fixedwidth.go
+++ b/go/arrow/datatype_fixedwidth.go
@@ -69,9 +69,16 @@ type TimestampType struct {
 	TimeZone string
 }
 
-func (*TimestampType) ID() Type         { return TIMESTAMP }
-func (*TimestampType) Name() string     { return "timestamp" }
-func (t *TimestampType) String() string { return "timestamp[" + t.Unit.String() + "]" }
+func (*TimestampType) ID() Type     { return TIMESTAMP }
+func (*TimestampType) Name() string { return "timestamp" }
+func (t *TimestampType) String() string {
+	switch len(t.TimeZone) {
+	case 0:
+		return "timestamp[" + t.Unit.String() + "]"
+	default:
+		return "timestamp[" + t.Unit.String() + ", tz=" + t.TimeZone + "]"
+	}
+}
 
 // BitWidth returns the number of bits required to store a single element of this data type in memory.
 func (*TimestampType) BitWidth() int { return 64 }
@@ -146,8 +153,8 @@ func (t *MonthIntervalType) BitWidth() int { return 32 }
 
 // DayTimeInterval represents a number of days and milliseconds (fraction of day).
 type DayTimeInterval struct {
-	Days         int32
-	Milliseconds int32
+	Days         int32 `json:"days"`
+	Milliseconds int32 `json:"milliseconds"`
 }
 
 // DayTimeIntervalType is encoded as a pair of 32-bit signed integer,
@@ -177,7 +184,10 @@ var (
 		Time32ms        FixedWidthDataType
 		Time64us        FixedWidthDataType
 		Time64ns        FixedWidthDataType
-		Timestamp       FixedWidthDataType
+		Timestamp_s     FixedWidthDataType
+		Timestamp_ms    FixedWidthDataType
+		Timestamp_us    FixedWidthDataType
+		Timestamp_ns    FixedWidthDataType
 	}{
 		Boolean:         &BooleanType{},
 		Date32:          &Date32Type{},
@@ -193,7 +203,10 @@ var (
 		Time32ms:        &Time32Type{Unit: Millisecond},
 		Time64us:        &Time64Type{Unit: Microsecond},
 		Time64ns:        &Time64Type{Unit: Nanosecond},
-		Timestamp:       &TimestampType{Unit: Nanosecond, TimeZone: "UTC"},
+		Timestamp_s:     &TimestampType{Unit: Second, TimeZone: "UTC"},
+		Timestamp_ms:    &TimestampType{Unit: Millisecond, TimeZone: "UTC"},
+		Timestamp_us:    &TimestampType{Unit: Microsecond, TimeZone: "UTC"},
+		Timestamp_ns:    &TimestampType{Unit: Nanosecond, TimeZone: "UTC"},
 	}
 
 	_ FixedWidthDataType = (*FixedSizeBinaryType)(nil)
diff --git a/go/arrow/example_test.go b/go/arrow/example_test.go
index de65d71..6413aee 100644
--- a/go/arrow/example_test.go
+++ b/go/arrow/example_test.go
@@ -224,6 +224,7 @@ func Example_fixedSizeListArray() {
 	vb.Append(2)
 
 	lb.AppendNull()
+	vb.AppendValues([]int64{-1, -1, -1}, nil)
 
 	lb.Append(true)
 	vb.Append(3)
@@ -242,41 +243,13 @@ func Example_fixedSizeListArray() {
 
 	fmt.Printf("NullN()   = %d\n", arr.NullN())
 	fmt.Printf("Len()     = %d\n", arr.Len())
-	fmt.Printf("Offsets() = %v\n", arr.Offsets())
 	fmt.Printf("Type()    = %v\n", arr.DataType())
-
-	offsets := arr.Offsets()[1:]
-
-	varr := arr.ListValues().(*array.Int64)
-
-	pos := 0
-	for i := 0; i < arr.Len(); i++ {
-		if !arr.IsValid(i) {
-			fmt.Printf("List[%d]   = (null)\n", i)
-			continue
-		}
-		fmt.Printf("List[%d]   = [", i)
-		for j := pos; j < int(offsets[i]); j++ {
-			if j != pos {
-				fmt.Printf(", ")
-			}
-			fmt.Printf("%v", varr.Value(j))
-		}
-		pos = int(offsets[i])
-		fmt.Printf("]\n")
-	}
 	fmt.Printf("List      = %v\n", arr)
 
 	// Output:
 	// NullN()   = 2
 	// Len()     = 5
-	// Offsets() = [0 3 3 6 9 9]
 	// Type()    = fixed_size_list<item: int64>[3]
-	// List[0]   = [0, 1, 2]
-	// List[1]   = (null)
-	// List[2]   = [3, 4, 5]
-	// List[3]   = [6, 7, 8]
-	// List[4]   = (null)
 	// List      = [[0 1 2] (null) [3 4 5] [6 7 8] (null)]
 }
 
diff --git a/go/arrow/go.mod b/go/arrow/go.mod
index e46257c..fa09504 100644
--- a/go/arrow/go.mod
+++ b/go/arrow/go.mod
@@ -16,6 +16,8 @@
 
 module github.com/apache/arrow/go/arrow
 
+go 1.12
+
 require (
 	github.com/davecgh/go-spew v1.1.0 // indirect
 	github.com/google/flatbuffers v1.11.0
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index 4216619..c249236 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -361,10 +361,16 @@ func makeStringsRecords() []array.Record {
 	return recs
 }
 
-type time32s arrow.Time32
-type time32ms arrow.Time32
-type time64ns arrow.Time64
-type time64us arrow.Time64
+type (
+	time32s      arrow.Time32
+	time32ms     arrow.Time32
+	time64ns     arrow.Time64
+	time64us     arrow.Time64
+	timestamp_s  arrow.Timestamp
+	timestamp_ms arrow.Timestamp
+	timestamp_us arrow.Timestamp
+	timestamp_ns arrow.Timestamp
+)
 
 func makeFixedWidthTypesRecords() []array.Record {
 	mem := memory.NewGoAllocator()
@@ -375,7 +381,10 @@ func makeFixedWidthTypesRecords() []array.Record {
 			arrow.Field{Name: "time32s", Type: arrow.FixedWidthTypes.Time32s, Nullable: true},
 			arrow.Field{Name: "time64ns", Type: arrow.FixedWidthTypes.Time64ns, Nullable: true},
 			arrow.Field{Name: "time64us", Type: arrow.FixedWidthTypes.Time64us, Nullable: true},
-			arrow.Field{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp, Nullable: true},
+			arrow.Field{Name: "timestamp_s", Type: arrow.FixedWidthTypes.Timestamp_s, Nullable: true},
+			arrow.Field{Name: "timestamp_ms", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true},
+			arrow.Field{Name: "timestamp_us", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
+			arrow.Field{Name: "timestamp_ns", Type: arrow.FixedWidthTypes.Timestamp_ns, Nullable: true},
 			arrow.Field{Name: "date32s", Type: arrow.FixedWidthTypes.Date32, Nullable: true},
 			arrow.Field{Name: "date64s", Type: arrow.FixedWidthTypes.Date64, Nullable: true},
 		}, nil,
@@ -397,7 +406,10 @@ func makeFixedWidthTypesRecords() []array.Record {
 			arrayOf(mem, []time32s{-2, -1, 0, +1, +2}, mask),
 			arrayOf(mem, []time64ns{-2, -1, 0, +1, +2}, mask),
 			arrayOf(mem, []time64us{-2, -1, 0, +1, +2}, mask),
-			arrayOf(mem, []arrow.Timestamp{0, +1, +2, +3, +4}, mask),
+			arrayOf(mem, []timestamp_s{0, +1, +2, +3, +4}, mask),
+			arrayOf(mem, []timestamp_ms{0, +1, +2, +3, +4}, mask),
+			arrayOf(mem, []timestamp_us{0, +1, +2, +3, +4}, mask),
+			arrayOf(mem, []timestamp_ns{0, +1, +2, +3, +4}, mask),
 			arrayOf(mem, []arrow.Date32{-2, -1, 0, +1, +2}, mask),
 			arrayOf(mem, []arrow.Date64{-2, -1, 0, +1, +2}, mask),
 		},
@@ -407,7 +419,10 @@ func makeFixedWidthTypesRecords() []array.Record {
 			arrayOf(mem, []time32s{-12, -11, 10, +11, +12}, mask),
 			arrayOf(mem, []time64ns{-12, -11, 10, +11, +12}, mask),
 			arrayOf(mem, []time64us{-12, -11, 10, +11, +12}, mask),
-			arrayOf(mem, []arrow.Timestamp{10, +11, +12, +13, +14}, mask),
+			arrayOf(mem, []timestamp_s{10, +11, +12, +13, +14}, mask),
+			arrayOf(mem, []timestamp_ms{10, +11, +12, +13, +14}, mask),
+			arrayOf(mem, []timestamp_us{10, +11, +12, +13, +14}, mask),
+			arrayOf(mem, []timestamp_ns{10, +11, +12, +13, +14}, mask),
 			arrayOf(mem, []arrow.Date32{-12, -11, 10, +11, +12}, mask),
 			arrayOf(mem, []arrow.Date64{-12, -11, 10, +11, +12}, mask),
 		},
@@ -417,7 +432,10 @@ func makeFixedWidthTypesRecords() []array.Record {
 			arrayOf(mem, []time32s{-22, -21, 20, +21, +22}, mask),
 			arrayOf(mem, []time64ns{-22, -21, 20, +21, +22}, mask),
 			arrayOf(mem, []time64us{-22, -21, 20, +21, +22}, mask),
-			arrayOf(mem, []arrow.Timestamp{20, +21, +22, +23, +24}, mask),
+			arrayOf(mem, []timestamp_s{20, +21, +22, +23, +24}, mask),
+			arrayOf(mem, []timestamp_ms{20, +21, +22, +23, +24}, mask),
+			arrayOf(mem, []timestamp_us{20, +21, +22, +23, +24}, mask),
+			arrayOf(mem, []timestamp_ns{20, +21, +22, +23, +24}, mask),
 			arrayOf(mem, []arrow.Date32{-22, -21, 20, +21, +22}, mask),
 			arrayOf(mem, []arrow.Date64{-22, -21, 20, +21, +22}, mask),
 		},
@@ -782,11 +800,48 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
 		bldr.AppendValues(vs, valids)
 		return bldr.NewArray()
 
-	case []arrow.Timestamp:
-		bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp.(*arrow.TimestampType))
+	case []timestamp_s:
+		bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_s.(*arrow.TimestampType))
 		defer bldr.Release()
 
-		bldr.AppendValues(a, valids)
+		vs := make([]arrow.Timestamp, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Timestamp(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []timestamp_ms:
+		bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_ms.(*arrow.TimestampType))
+		defer bldr.Release()
+
+		vs := make([]arrow.Timestamp, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Timestamp(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []timestamp_us:
+		bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_us.(*arrow.TimestampType))
+		defer bldr.Release()
+
+		vs := make([]arrow.Timestamp, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Timestamp(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []timestamp_ns:
+		bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp_ns.(*arrow.TimestampType))
+		defer bldr.Release()
+
+		vs := make([]arrow.Timestamp, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Timestamp(v)
+		}
+		bldr.AppendValues(vs, valids)
 		return bldr.NewArray()
 
 	case []arrow.Date32:
diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go
new file mode 100644
index 0000000..7065f64
--- /dev/null
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrdata // import "github.com/apache/arrow/go/arrow/internal/arrdata"
+
+import (
+	"io"
+	"os"
+	"testing"
+
+	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/ipc"
+	"github.com/apache/arrow/go/arrow/memory"
+)
+
+// CheckArrowFile checks whether a given ARROW file contains the expected list of records.
+func CheckArrowFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	_, err := f.Seek(0, io.SeekStart)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer r.Close()
+
+	for i := 0; i < r.NumRecords(); i++ {
+		rec, err := r.Record(i)
+		if err != nil {
+			t.Fatalf("could not read record %d: %v", i, err)
+		}
+		if !array.RecordEqual(rec, recs[i]) {
+			t.Fatalf("records[%d] differ", i)
+		}
+	}
+
+	err = r.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}
+
+// CheckArrowStream checks whether a given ARROW stream contains the expected list of records.
+func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	_, err := f.Seek(0, io.SeekStart)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	r, err := ipc.NewReader(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer r.Release()
+
+	n := 0
+	for r.Next() {
+		rec := r.Record()
+		if !array.RecordEqual(rec, recs[n]) {
+			t.Fatalf("records[%d] differ", n)
+		}
+		n++
+	}
+
+	if len(recs) != n {
+		t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
+
+	}
+}
+
+// WriteFile writes a list of records to the given file descriptor, as an ARROW file.
+func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	w, err := ipc.NewFileWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer w.Close()
+
+	for i, rec := range recs {
+		err = w.Write(rec)
+		if err != nil {
+			t.Fatalf("could not write record[%d]: %v", i, err)
+		}
+	}
+
+	err = w.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = f.Sync()
+	if err != nil {
+		t.Fatalf("could not sync data to disk: %v", err)
+	}
+
+	_, err = f.Seek(0, io.SeekStart)
+	if err != nil {
+		t.Fatalf("could not seek to start: %v", err)
+	}
+}
+
+// WriteStream writes a list of records to the given file descriptor, as an ARROW stream.
+func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) {
+	t.Helper()
+
+	w := ipc.NewWriter(f, ipc.WithSchema(schema), ipc.WithAllocator(mem))
+	defer w.Close()
+
+	for i, rec := range recs {
+		err := w.Write(rec)
+		if err != nil {
+			t.Fatalf("could not write record[%d]: %v", i, err)
+		}
+	}
+
+	err := w.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/go/arrow/internal/arrjson/arrjson.go b/go/arrow/internal/arrjson/arrjson.go
new file mode 100644
index 0000000..11cc3d0
--- /dev/null
+++ b/go/arrow/internal/arrjson/arrjson.go
@@ -0,0 +1,1464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package arrjson provides types and functions to encode and decode ARROW types and data
+// to and from JSON files.
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+	"encoding/hex"
+	"encoding/json"
+	"strconv"
+	"strings"
+
+	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/float16"
+	"github.com/apache/arrow/go/arrow/memory"
+	"github.com/pkg/errors"
+)
+
+const (
+	kData         = "DATA"
+	kDays         = "days"
+	kDayTime      = "DAY_TIME"
+	kDuration     = "duration"
+	kMilliseconds = "milliseconds"
+	kYearMonth    = "YEAR_MONTH"
+)
+
+type Schema struct {
+	Fields []Field `json:"fields"`
+}
+
+type Field struct {
+	Name     string   `json:"name"`
+	Type     dataType `json:"type"`
+	Nullable bool     `json:"nullable"`
+	Children []Field  `json:"children"`
+}
+
+type dataType struct {
+	Name      string `json:"name"`
+	Signed    bool   `json:"isSigned,omitempty"`
+	BitWidth  int    `json:"bitWidth,omitempty"`
+	Precision string `json:"precision,omitempty"`
+	ByteWidth int    `json:"byteWidth,omitempty"`
+	ListSize  int32  `json:"listSize,omitempty"`
+	Unit      string `json:"unit,omitempty"`
+	TimeZone  string `json:"timezone,omitempty"`
+	Scale     int    `json:"scale,omitempty"` // for Decimal128
+}
+
+func dtypeToJSON(dt arrow.DataType) dataType {
+	switch dt := dt.(type) {
+	case *arrow.BooleanType:
+		return dataType{Name: "bool"}
+	case *arrow.Int8Type:
+		return dataType{Name: "int", Signed: true, BitWidth: 8}
+	case *arrow.Int16Type:
+		return dataType{Name: "int", Signed: true, BitWidth: 16}
+	case *arrow.Int32Type:
+		return dataType{Name: "int", Signed: true, BitWidth: 32}
+	case *arrow.Int64Type:
+		return dataType{Name: "int", Signed: true, BitWidth: 64}
+	case *arrow.Uint8Type:
+		return dataType{Name: "int", BitWidth: 8}
+	case *arrow.Uint16Type:
+		return dataType{Name: "int", BitWidth: 16}
+	case *arrow.Uint32Type:
+		return dataType{Name: "int", BitWidth: 32}
+	case *arrow.Uint64Type:
+		return dataType{Name: "int", BitWidth: 64}
+	case *arrow.Float16Type:
+		return dataType{Name: "floatingpoint", Precision: "HALF"}
+	case *arrow.Float32Type:
+		return dataType{Name: "floatingpoint", Precision: "SINGLE"}
+	case *arrow.Float64Type:
+		return dataType{Name: "floatingpoint", Precision: "DOUBLE"}
+	case *arrow.BinaryType:
+		return dataType{Name: "binary"}
+	case *arrow.StringType:
+		return dataType{Name: "utf8"}
+	case *arrow.Date32Type:
+		return dataType{Name: "date", Unit: "DAY"}
+	case *arrow.Date64Type:
+		return dataType{Name: "date", Unit: "MILLISECOND"}
+	case *arrow.Time32Type:
+		switch dt.Unit {
+		case arrow.Second:
+			return dataType{Name: "time", Unit: "SECOND", BitWidth: dt.BitWidth()}
+		case arrow.Millisecond:
+			return dataType{Name: "time", Unit: "MILLISECOND", BitWidth: dt.BitWidth()}
+		}
+	case *arrow.Time64Type:
+		switch dt.Unit {
+		case arrow.Microsecond:
+			return dataType{Name: "time", Unit: "MICROSECOND", BitWidth: dt.BitWidth()}
+		case arrow.Nanosecond:
+			return dataType{Name: "time", Unit: "NANOSECOND", BitWidth: dt.BitWidth()}
+		}
+	case *arrow.TimestampType:
+		switch dt.Unit {
+		case arrow.Second:
+			return dataType{Name: "timestamp", Unit: "SECOND", TimeZone: dt.TimeZone}
+		case arrow.Millisecond:
+			return dataType{Name: "timestamp", Unit: "MILLISECOND", TimeZone: dt.TimeZone}
+		case arrow.Microsecond:
+			return dataType{Name: "timestamp", Unit: "MICROSECOND", TimeZone: dt.TimeZone}
+		case arrow.Nanosecond:
+			return dataType{Name: "timestamp", Unit: "NANOSECOND", TimeZone: dt.TimeZone}
+		}
+	case *arrow.MonthIntervalType:
+		return dataType{Name: "interval", Unit: "YEAR_MONTH"}
+	case *arrow.DayTimeIntervalType:
+		return dataType{Name: "interval", Unit: "DAY_TIME"}
+	case *arrow.DurationType:
+		switch dt.Unit {
+		case arrow.Second:
+			return dataType{Name: "duration", Unit: "SECOND"}
+		case arrow.Millisecond:
+			return dataType{Name: "duration", Unit: "MILLISECOND"}
+		case arrow.Microsecond:
+			return dataType{Name: "duration", Unit: "MICROSECOND"}
+		case arrow.Nanosecond:
+			return dataType{Name: "duration", Unit: "NANOSECOND"}
+		}
+
+	case *arrow.ListType:
+		return dataType{Name: "list"}
+	case *arrow.StructType:
+		return dataType{Name: "struct"}
+	case *arrow.FixedSizeListType:
+		return dataType{Name: "fixedsizelist", ListSize: dt.Len()}
+	case *arrow.FixedSizeBinaryType:
+		return dataType{
+			Name:      "fixedsizebinary",
+			ByteWidth: dt.ByteWidth,
+		}
+	}
+	panic(errors.Errorf("unknown arrow.DataType %v", dt))
+}
+
+func dtypeFromJSON(dt dataType, children []Field) arrow.DataType {
+	switch dt.Name {
+	case "bool":
+		return arrow.FixedWidthTypes.Boolean
+	case "int":
+		switch dt.Signed {
+		case true:
+			switch dt.BitWidth {
+			case 8:
+				return arrow.PrimitiveTypes.Int8
+			case 16:
+				return arrow.PrimitiveTypes.Int16
+			case 32:
+				return arrow.PrimitiveTypes.Int32
+			case 64:
+				return arrow.PrimitiveTypes.Int64
+			}
+		default:
+			switch dt.BitWidth {
+			case 8:
+				return arrow.PrimitiveTypes.Uint8
+			case 16:
+				return arrow.PrimitiveTypes.Uint16
+			case 32:
+				return arrow.PrimitiveTypes.Uint32
+			case 64:
+				return arrow.PrimitiveTypes.Uint64
+			}
+		}
+	case "floatingpoint":
+		switch dt.Precision {
+		case "HALF":
+			return arrow.FixedWidthTypes.Float16
+		case "SINGLE":
+			return arrow.PrimitiveTypes.Float32
+		case "DOUBLE":
+			return arrow.PrimitiveTypes.Float64
+		}
+	case "binary":
+		return arrow.BinaryTypes.Binary
+	case "utf8":
+		return arrow.BinaryTypes.String
+	case "date":
+		switch dt.Unit {
+		case "DAY":
+			return arrow.FixedWidthTypes.Date32
+		case "MILLISECOND":
+			return arrow.FixedWidthTypes.Date64
+		}
+	case "time":
+		switch dt.BitWidth {
+		case 32:
+			switch dt.Unit {
+			case "SECOND":
+				return arrow.FixedWidthTypes.Time32s
+			case "MILLISECOND":
+				return arrow.FixedWidthTypes.Time32ms
+			}
+		case 64:
+			switch dt.Unit {
+			case "MICROSECOND":
+				return arrow.FixedWidthTypes.Time64us
+			case "NANOSECOND":
+				return arrow.FixedWidthTypes.Time64ns
+			}
+		}
+	case "timestamp":
+		switch dt.Unit {
+		case "SECOND":
+			return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Second}
+		case "MILLISECOND":
+			return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Millisecond}
+		case "MICROSECOND":
+			return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Microsecond}
+		case "NANOSECOND":
+			return &arrow.TimestampType{TimeZone: dt.TimeZone, Unit: arrow.Nanosecond}
+		}
+	case "list":
+		return arrow.ListOf(dtypeFromJSON(children[0].Type, nil))
+	case "struct":
+		return arrow.StructOf(fieldsFromJSON(children)...)
+	case "fixedsizebinary":
+		return &arrow.FixedSizeBinaryType{ByteWidth: dt.ByteWidth}
+	case "fixedsizelist":
+		return arrow.FixedSizeListOf(dt.ListSize, dtypeFromJSON(children[0].Type, nil))
+	case "interval":
+		switch dt.Unit {
+		case "YEAR_MONTH":
+			return arrow.FixedWidthTypes.MonthInterval
+		case "DAY_TIME":
+			return arrow.FixedWidthTypes.DayTimeInterval
+		}
+	case "duration":
+		switch dt.Unit {
+		case "SECOND":
+			return arrow.FixedWidthTypes.Duration_s
+		case "MILLISECOND":
+			return arrow.FixedWidthTypes.Duration_ms
+		case "MICROSECOND":
+			return arrow.FixedWidthTypes.Duration_us
+		case "NANOSECOND":
+			return arrow.FixedWidthTypes.Duration_ns
+		}
+	}
+	panic(errors.Errorf("unknown DataType %#v", dt))
+}
+
+func schemaToJSON(schema *arrow.Schema) Schema {
+	return Schema{
+		Fields: fieldsToJSON(schema.Fields()),
+	}
+}
+
+func schemaFromJSON(schema Schema) *arrow.Schema {
+	return arrow.NewSchema(fieldsFromJSON(schema.Fields), nil)
+}
+
+func fieldsToJSON(fields []arrow.Field) []Field {
+	o := make([]Field, len(fields))
+	for i, f := range fields {
+		o[i] = Field{
+			Name:     f.Name,
+			Type:     dtypeToJSON(f.Type),
+			Nullable: f.Nullable,
+			Children: []Field{},
+		}
+		switch dt := f.Type.(type) {
+		case *arrow.ListType:
+			o[i].Children = fieldsToJSON([]arrow.Field{{Name: "item", Type: dt.Elem(), Nullable: f.Nullable}})
+		case *arrow.FixedSizeListType:
+			o[i].Children = fieldsToJSON([]arrow.Field{{Name: "item", Type: dt.Elem(), Nullable: f.Nullable}})
+		case *arrow.StructType:
+			o[i].Children = fieldsToJSON(dt.Fields())
+		}
+	}
+	return o
+}
+
+func fieldsFromJSON(fields []Field) []arrow.Field {
+	vs := make([]arrow.Field, len(fields))
+	for i, v := range fields {
+		vs[i] = fieldFromJSON(v)
+	}
+	return vs
+}
+
+func fieldFromJSON(f Field) arrow.Field {
+	return arrow.Field{
+		Name:     f.Name,
+		Type:     dtypeFromJSON(f.Type, f.Children),
+		Nullable: f.Nullable,
+	}
+}
+
+type Record struct {
+	Count   int64   `json:"count"`
+	Columns []Array `json:"columns"`
+}
+
+func recordsFromJSON(mem memory.Allocator, schema *arrow.Schema, recs []Record) []array.Record {
+	vs := make([]array.Record, len(recs))
+	for i, rec := range recs {
+		vs[i] = recordFromJSON(mem, schema, rec)
+	}
+	return vs
+}
+
+func recordFromJSON(mem memory.Allocator, schema *arrow.Schema, rec Record) array.Record {
+	arrs := arraysFromJSON(mem, schema, rec.Columns)
+	defer func() {
+		for _, arr := range arrs {
+			arr.Release()
+		}
+	}()
+	return array.NewRecord(schema, arrs, int64(rec.Count))
+}
+
+func recordToJSON(rec array.Record) Record {
+	return Record{
+		Count:   rec.NumRows(),
+		Columns: arraysToJSON(rec.Schema(), rec.Columns()),
+	}
+}
+
+type Array struct {
+	Name     string        `json:"name"`
+	Count    int           `json:"count"`
+	Valids   []int         `json:"VALIDITY,omitempty"`
+	Data     []interface{} `json:"DATA,omitempty"`
+	Offset   []int32       `json:"OFFSET,omitempty"`
+	Children []Array       `json:"children,omitempty"`
+}
+
+func arraysFromJSON(mem memory.Allocator, schema *arrow.Schema, arrs []Array) []array.Interface {
+	o := make([]array.Interface, len(arrs))
+	for i, v := range arrs {
+		o[i] = arrayFromJSON(mem, schema.Field(i).Type, v)
+	}
+	return o
+}
+
+func arraysToJSON(schema *arrow.Schema, arrs []array.Interface) []Array {
+	o := make([]Array, len(arrs))
+	for i, v := range arrs {
+		o[i] = arrayToJSON(schema.Field(i), v)
+	}
+	return o
+}
+
+func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) array.Interface {
+	switch dt := dt.(type) {
+	case *arrow.BooleanType:
+		bldr := array.NewBooleanBuilder(mem)
+		defer bldr.Release()
+		data := boolsFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Int8Type:
+		bldr := array.NewInt8Builder(mem)
+		defer bldr.Release()
+		data := i8FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Int16Type:
+		bldr := array.NewInt16Builder(mem)
+		defer bldr.Release()
+		data := i16FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Int32Type:
+		bldr := array.NewInt32Builder(mem)
+		defer bldr.Release()
+		data := i32FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Int64Type:
+		bldr := array.NewInt64Builder(mem)
+		defer bldr.Release()
+		data := i64FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Uint8Type:
+		bldr := array.NewUint8Builder(mem)
+		defer bldr.Release()
+		data := u8FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Uint16Type:
+		bldr := array.NewUint16Builder(mem)
+		defer bldr.Release()
+		data := u16FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Uint32Type:
+		bldr := array.NewUint32Builder(mem)
+		defer bldr.Release()
+		data := u32FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Uint64Type:
+		bldr := array.NewUint64Builder(mem)
+		defer bldr.Release()
+		data := u64FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Float16Type:
+		bldr := array.NewFloat16Builder(mem)
+		defer bldr.Release()
+		data := f16FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Float32Type:
+		bldr := array.NewFloat32Builder(mem)
+		defer bldr.Release()
+		data := f32FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Float64Type:
+		bldr := array.NewFloat64Builder(mem)
+		defer bldr.Release()
+		data := f64FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.StringType:
+		bldr := array.NewStringBuilder(mem)
+		defer bldr.Release()
+		data := strFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.BinaryType:
+		bldr := array.NewBinaryBuilder(mem, dt)
+		defer bldr.Release()
+		data := bytesFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.ListType:
+		bldr := array.NewListBuilder(mem, dt.Elem())
+		defer bldr.Release()
+		valids := validsFromJSON(arr.Valids)
+		elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0])
+		defer elems.Release()
+		for i, v := range valids {
+			bldr.Append(v)
+			beg := int64(arr.Offset[i])
+			end := int64(arr.Offset[i+1])
+			slice := array.NewSlice(elems, beg, end)
+			buildArray(bldr.ValueBuilder(), slice)
+			slice.Release()
+		}
+		return bldr.NewArray()
+
+	case *arrow.FixedSizeListType:
+		bldr := array.NewFixedSizeListBuilder(mem, dt.Len(), dt.Elem())
+		defer bldr.Release()
+		valids := validsFromJSON(arr.Valids)
+		elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0])
+		defer elems.Release()
+		size := int64(dt.Len())
+		for i, v := range valids {
+			bldr.Append(v)
+			beg := int64(i) * size
+			end := int64(i+1) * size
+			slice := array.NewSlice(elems, beg, end)
+			buildArray(bldr.ValueBuilder(), slice)
+			slice.Release()
+		}
+		return bldr.NewArray()
+
+	case *arrow.StructType:
+		bldr := array.NewStructBuilder(mem, dt)
+		defer bldr.Release()
+		valids := validsFromJSON(arr.Valids)
+		fields := make([]array.Interface, len(dt.Fields()))
+		for i := range fields {
+			fields[i] = arrayFromJSON(mem, dt.Field(i).Type, arr.Children[i])
+		}
+
+		bldr.AppendValues(valids)
+		for i := range dt.Fields() {
+			fbldr := bldr.FieldBuilder(i)
+			buildArray(fbldr, fields[i])
+			fields[i].Release()
+		}
+
+		return bldr.NewArray()
+
+	case *arrow.FixedSizeBinaryType:
+		bldr := array.NewFixedSizeBinaryBuilder(mem, dt)
+		defer bldr.Release()
+		strdata := strFromJSON(arr.Data)
+		data := make([][]byte, len(strdata))
+		for i, v := range strdata {
+			if len(v) != 2*dt.ByteWidth {
+				panic(errors.Errorf("arrjson: invalid hex-string length (got=%d, want=%d)", len(v), 2*dt.ByteWidth))
+			}
+			vv, err := hex.DecodeString(v)
+			if err != nil {
+				panic(err)
+			}
+			data[i] = vv
+		}
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Date32Type:
+		bldr := array.NewDate32Builder(mem)
+		defer bldr.Release()
+		data := date32FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Date64Type:
+		bldr := array.NewDate64Builder(mem)
+		defer bldr.Release()
+		data := date64FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Time32Type:
+		bldr := array.NewTime32Builder(mem, dt)
+		defer bldr.Release()
+		data := time32FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.Time64Type:
+		bldr := array.NewTime64Builder(mem, dt)
+		defer bldr.Release()
+		data := time64FromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.TimestampType:
+		bldr := array.NewTimestampBuilder(mem, dt)
+		defer bldr.Release()
+		data := timestampFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.MonthIntervalType:
+		bldr := array.NewMonthIntervalBuilder(mem)
+		defer bldr.Release()
+		data := monthintervalFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.DayTimeIntervalType:
+		bldr := array.NewDayTimeIntervalBuilder(mem)
+		defer bldr.Release()
+		data := daytimeintervalFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	case *arrow.DurationType:
+		bldr := array.NewDurationBuilder(mem, dt)
+		defer bldr.Release()
+		data := durationFromJSON(arr.Data)
+		valids := validsFromJSON(arr.Valids)
+		bldr.AppendValues(data, valids)
+		return bldr.NewArray()
+
+	default:
+		panic(errors.Errorf("unknown data type %v %T", dt, dt))
+	}
+	panic("impossible")
+}
+
+func arrayToJSON(field arrow.Field, arr array.Interface) Array {
+	switch arr := arr.(type) {
+	case *array.Boolean:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   boolsToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Int8:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   i8ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Int16:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   i16ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Int32:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   i32ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Int64:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   i64ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Uint8:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   u8ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Uint16:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   u16ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Uint32:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   u32ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Uint64:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   u64ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Float16:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   f16ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Float32:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   f32ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Float64:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   f64ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.String:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   strToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Binary:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   bytesToJSON(arr),
+			Valids: validsToJSON(arr),
+			Offset: arr.ValueOffsets(),
+		}
+
+	case *array.List:
+		o := Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Valids: validsToJSON(arr),
+			Offset: arr.Offsets(),
+			Children: []Array{
+				arrayToJSON(arrow.Field{Name: "item", Type: arr.DataType().(*arrow.ListType).Elem()}, arr.ListValues()),
+			},
+		}
+		return o
+
+	case *array.FixedSizeList:
+		o := Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Valids: validsToJSON(arr),
+			Children: []Array{
+				arrayToJSON(arrow.Field{Name: "", Type: arr.DataType().(*arrow.FixedSizeListType).Elem()}, arr.ListValues()),
+			},
+		}
+		return o
+
+	case *array.Struct:
+		dt := arr.DataType().(*arrow.StructType)
+		o := Array{
+			Name:     field.Name,
+			Count:    arr.Len(),
+			Valids:   validsToJSON(arr),
+			Children: make([]Array, len(dt.Fields())),
+		}
+		for i := range o.Children {
+			o.Children[i] = arrayToJSON(dt.Field(i), arr.Field(i))
+		}
+		return o
+
+	case *array.FixedSizeBinary:
+		dt := arr.DataType().(*arrow.FixedSizeBinaryType)
+		o := Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Valids: validsToJSON(arr),
+			Data:   make([]interface{}, arr.Len()),
+		}
+		for i := range o.Data {
+			v := []byte(strings.ToUpper(hex.EncodeToString(arr.Value(i))))
+			if len(v) != 2*dt.ByteWidth {
+				panic(errors.Errorf("arrjson: invalid hex-string length (got=%d, want=%d)", len(v), 2*dt.ByteWidth))
+			}
+			o.Data[i] = string(v) // re-convert as string to prevent json.Marshal from base64-encoding it.
+		}
+		return o
+
+	case *array.Date32:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   date32ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Date64:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   date64ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Time32:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   time32ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Time64:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   time64ToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	case *array.Timestamp:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   timestampToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+	case *array.MonthInterval:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   monthintervalToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+	case *array.DayTimeInterval:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   daytimeintervalToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+	case *array.Duration:
+		return Array{
+			Name:   field.Name,
+			Count:  arr.Len(),
+			Data:   durationToJSON(arr),
+			Valids: validsToJSON(arr),
+		}
+
+	default:
+		panic(errors.Errorf("unknown array type %T", arr))
+	}
+	panic("impossible")
+}
+
+func validsFromJSON(vs []int) []bool {
+	o := make([]bool, len(vs))
+	for i, v := range vs {
+		if v > 0 {
+			o[i] = true
+		}
+	}
+	return o
+}
+
+func validsToJSON(arr array.Interface) []int {
+	o := make([]int, arr.Len())
+	for i := range o {
+		if arr.IsValid(i) {
+			o[i] = 1
+		}
+	}
+	return o
+}
+
+func boolsFromJSON(vs []interface{}) []bool {
+	o := make([]bool, len(vs))
+	for i, v := range vs {
+		o[i] = v.(bool)
+	}
+	return o
+}
+
+func boolsToJSON(arr *array.Boolean) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func i8FromJSON(vs []interface{}) []int8 {
+	o := make([]int8, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = int8(vv)
+	}
+	return o
+}
+
+func i8ToJSON(arr *array.Int8) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func i16FromJSON(vs []interface{}) []int16 {
+	o := make([]int16, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = int16(vv)
+	}
+	return o
+}
+
+func i16ToJSON(arr *array.Int16) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func i32FromJSON(vs []interface{}) []int32 {
+	o := make([]int32, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = int32(vv)
+	}
+	return o
+}
+
+func i32ToJSON(arr *array.Int32) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func i64FromJSON(vs []interface{}) []int64 {
+	o := make([]int64, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = int64(vv)
+	}
+	return o
+}
+
+func i64ToJSON(arr *array.Int64) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func u8FromJSON(vs []interface{}) []uint8 {
+	o := make([]uint8, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = uint8(vv)
+	}
+	return o
+}
+
+func u8ToJSON(arr *array.Uint8) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func u16FromJSON(vs []interface{}) []uint16 {
+	o := make([]uint16, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = uint16(vv)
+	}
+	return o
+}
+
+func u16ToJSON(arr *array.Uint16) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func u32FromJSON(vs []interface{}) []uint32 {
+	o := make([]uint32, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = uint32(vv)
+	}
+	return o
+}
+
+func u32ToJSON(arr *array.Uint32) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func u64FromJSON(vs []interface{}) []uint64 {
+	o := make([]uint64, len(vs))
+	for i, v := range vs {
+		vv, err := strconv.ParseUint(v.(json.Number).String(), 10, 64)
+		if err != nil {
+			panic(err)
+		}
+		o[i] = uint64(vv)
+	}
+	return o
+}
+
+func u64ToJSON(arr *array.Uint64) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func f16FromJSON(vs []interface{}) []float16.Num {
+	o := make([]float16.Num, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Float64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = float16.New(float32(vv))
+	}
+	return o
+}
+
+func f16ToJSON(arr *array.Float16) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i).Float32()
+	}
+	return o
+}
+
+func f32FromJSON(vs []interface{}) []float32 {
+	o := make([]float32, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Float64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = float32(vv)
+	}
+	return o
+}
+
+func f32ToJSON(arr *array.Float32) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func f64FromJSON(vs []interface{}) []float64 {
+	o := make([]float64, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Float64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = float64(vv)
+	}
+	return o
+}
+
+func f64ToJSON(arr *array.Float64) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func strFromJSON(vs []interface{}) []string {
+	o := make([]string, len(vs))
+	for i, v := range vs {
+		switch v := v.(type) {
+		case string:
+			o[i] = v
+		case json.Number:
+			o[i] = v.String()
+		default:
+			panic(errors.Errorf("could not convert %v (%T) to a string", v, v))
+		}
+	}
+	return o
+}
+
+func strToJSON(arr *array.String) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func bytesFromJSON(vs []interface{}) [][]byte {
+	o := make([][]byte, len(vs))
+	for i, v := range vs {
+		var err error
+		switch v := v.(type) {
+		case string:
+			o[i], err = hex.DecodeString(v)
+		case json.Number:
+			o[i], err = hex.DecodeString(v.String())
+		default:
+			panic(errors.Errorf("could not convert %v (%T) to a string", v, v))
+		}
+		if err != nil {
+			panic(errors.Errorf("could not decode %v: %v", v, err))
+		}
+	}
+	return o
+}
+
+func bytesToJSON(arr *array.Binary) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = strings.ToUpper(hex.EncodeToString(arr.Value(i)))
+	}
+	return o
+}
+
+func date32FromJSON(vs []interface{}) []arrow.Date32 {
+	o := make([]arrow.Date32, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Date32(vv)
+	}
+	return o
+}
+
+func date32ToJSON(arr *array.Date32) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int32(arr.Value(i))
+	}
+	return o
+}
+
+func date64FromJSON(vs []interface{}) []arrow.Date64 {
+	o := make([]arrow.Date64, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Date64(vv)
+	}
+	return o
+}
+
+func date64ToJSON(arr *array.Date64) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int64(arr.Value(i))
+	}
+	return o
+}
+
+func time32FromJSON(vs []interface{}) []arrow.Time32 {
+	o := make([]arrow.Time32, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Time32(vv)
+	}
+	return o
+}
+
+func time32ToJSON(arr *array.Time32) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int32(arr.Value(i))
+	}
+	return o
+}
+
+func time64FromJSON(vs []interface{}) []arrow.Time64 {
+	o := make([]arrow.Time64, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Time64(vv)
+	}
+	return o
+}
+
+func time64ToJSON(arr *array.Time64) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int64(arr.Value(i))
+	}
+	return o
+}
+
+func timestampFromJSON(vs []interface{}) []arrow.Timestamp {
+	o := make([]arrow.Timestamp, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Timestamp(vv)
+	}
+	return o
+}
+
+func timestampToJSON(arr *array.Timestamp) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int64(arr.Value(i))
+	}
+	return o
+}
+
+func monthintervalFromJSON(vs []interface{}) []arrow.MonthInterval {
+	o := make([]arrow.MonthInterval, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.MonthInterval(int32(vv))
+	}
+	return o
+}
+
+func monthintervalToJSON(arr *array.MonthInterval) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = int32(arr.Value(i))
+	}
+	return o
+}
+
+func daytimeintervalFromJSON(vs []interface{}) []arrow.DayTimeInterval {
+	o := make([]arrow.DayTimeInterval, len(vs))
+	for i, vv := range vs {
+		v := vv.(map[string]interface{})
+		days, err := v["days"].(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		ms, err := v["milliseconds"].(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.DayTimeInterval{Days: int32(days), Milliseconds: int32(ms)}
+	}
+	return o
+}
+
+func daytimeintervalToJSON(arr *array.DayTimeInterval) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func durationFromJSON(vs []interface{}) []arrow.Duration {
+	o := make([]arrow.Duration, len(vs))
+	for i, v := range vs {
+		vv, err := v.(json.Number).Int64()
+		if err != nil {
+			panic(err)
+		}
+		o[i] = arrow.Duration(vv)
+	}
+	return o
+}
+
+func durationToJSON(arr *array.Duration) []interface{} {
+	o := make([]interface{}, arr.Len())
+	for i := range o {
+		o[i] = arr.Value(i)
+	}
+	return o
+}
+
+func buildArray(bldr array.Builder, data array.Interface) {
+	defer data.Release()
+
+	switch bldr := bldr.(type) {
+	default:
+		panic(errors.Errorf("unknown builder %T", bldr))
+
+	case *array.BooleanBuilder:
+		data := data.(*array.Boolean)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Int8Builder:
+		data := data.(*array.Int8)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Int16Builder:
+		data := data.(*array.Int16)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Int32Builder:
+		data := data.(*array.Int32)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Int64Builder:
+		data := data.(*array.Int64)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Uint8Builder:
+		data := data.(*array.Uint8)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Uint16Builder:
+		data := data.(*array.Uint16)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Uint32Builder:
+		data := data.(*array.Uint32)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Uint64Builder:
+		data := data.(*array.Uint64)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Float32Builder:
+		data := data.(*array.Float32)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.Float64Builder:
+		data := data.(*array.Float64)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+
+	case *array.StringBuilder:
+		data := data.(*array.String)
+		for i := 0; i < data.Len(); i++ {
+			switch {
+			case data.IsValid(i):
+				bldr.Append(data.Value(i))
+			default:
+				bldr.AppendNull()
+			}
+		}
+	}
+}
diff --git a/go/arrow/internal/arrjson/arrjson_test.go b/go/arrow/internal/arrjson/arrjson_test.go
new file mode 100644
index 0000000..61dfd53
--- /dev/null
+++ b/go/arrow/internal/arrjson/arrjson_test.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+	"io"
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/internal/arrdata"
+	"github.com/apache/arrow/go/arrow/memory"
+)
+
+func TestReadWrite(t *testing.T) {
+	for name, recs := range arrdata.Records {
+		t.Run(name, func(t *testing.T) {
+			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+			defer mem.AssertSize(t, 0)
+
+			f, err := ioutil.TempFile("", "arrjson-")
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer f.Close()
+			defer os.RemoveAll(f.Name())
+
+			w, err := NewWriter(f, recs[0].Schema())
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer w.Close()
+
+			for i, rec := range recs {
+				err = w.Write(rec)
+				if err != nil {
+					t.Fatalf("could not write record[%d] to JSON: %v", i, err)
+				}
+			}
+
+			err = w.Close()
+			if err != nil {
+				t.Fatalf("could not close JSON writer: %v", err)
+			}
+
+			err = f.Sync()
+			if err != nil {
+				t.Fatalf("could not sync data to disk: %v", err)
+			}
+
+			_, err = f.Seek(0, io.SeekStart)
+			if err != nil {
+				t.Fatalf("could not rewind file: %v", err)
+			}
+
+			r, err := NewReader(f, WithAllocator(mem), WithSchema(recs[0].Schema()))
+			if err != nil {
+				raw, _ := ioutil.ReadFile(f.Name())
+				t.Fatalf("could not read JSON file: %v\n%v\n", err, string(raw))
+			}
+			defer r.Release()
+
+			r.Retain()
+			r.Release()
+
+			if got, want := r.Schema(), recs[0].Schema(); !got.Equal(want) {
+				t.Fatalf("invalid schema\ngot:\n%v\nwant:\n%v\n", got, want)
+			}
+
+			if got, want := r.NumRecords(), len(recs); got != want {
+				t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+			}
+
+			nrecs := 0
+			for {
+				rec, err := r.Read()
+				if err == io.EOF {
+					break
+				}
+				if err != nil {
+					t.Fatalf("could not read record[%d]: %v", nrecs, err)
+				}
+
+				if !array.RecordEqual(rec, recs[nrecs]) {
+					t.Fatalf("records[%d] differ", nrecs)
+				}
+				nrecs++
+			}
+
+			if got, want := nrecs, len(recs); got != want {
+				t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+			}
+		})
+	}
+}
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/internal/arrjson/option.go
similarity index 54%
copy from go/arrow/ipc/ipc.go
copy to go/arrow/internal/arrjson/option.go
index 470baf2..ea89f6e 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/internal/arrjson/option.go
@@ -14,55 +14,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package ipc // import "github.com/apache/arrow/go/arrow/ipc"
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
 
 import (
-	"io"
-
 	"github.com/apache/arrow/go/arrow"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
-const (
-	errNotArrowFile             = errString("arrow/ipc: not an Arrow file")
-	errInconsistentFileMetadata = errString("arrow/ipc: file is smaller than indicated metadata size")
-	errInconsistentSchema       = errString("arrow/ipc: tried to write record batch with different schema")
-	errMaxRecursion             = errString("arrow/ipc: max recursion depth reached")
-	errBigArray                 = errString("arrow/ipc: array larger than 2^31-1 in length")
-
-	kArrowAlignment    = 64 // buffers are padded to 64b boundaries (for SIMD)
-	kTensorAlignment   = 64 // tensors are padded to 64b boundaries
-	kArrowIPCAlignment = 8  // align on 8b boundaries in IPC
-)
-
-var (
-	paddingBytes [kArrowAlignment]byte
-	kEOS         = [4]byte{0, 0, 0, 0} // end of stream message
-)
-
-func paddedLength(nbytes int64, alignment int32) int64 {
-	align := int64(alignment)
-	return ((nbytes + align - 1) / align) * align
-}
-
-type errString string
-
-func (s errString) Error() string {
-	return string(s)
-}
-
-type ReadAtSeeker interface {
-	io.Reader
-	io.Seeker
-	io.ReaderAt
-}
-
 type config struct {
 	alloc  memory.Allocator
 	schema *arrow.Schema
-	footer struct {
-		offset int64
-	}
 }
 
 func newConfig(opts ...Option) *config {
@@ -81,13 +42,6 @@ func newConfig(opts ...Option) *config {
 // and streams.
 type Option func(*config)
 
-// WithFooterOffset specifies the Arrow footer position in bytes.
-func WithFooterOffset(offset int64) Option {
-	return func(cfg *config) {
-		cfg.footer.offset = offset
-	}
-}
-
 // WithAllocator specifies the Arrow memory allocator used while building records.
 func WithAllocator(mem memory.Allocator) Option {
 	return func(cfg *config) {
diff --git a/go/arrow/internal/arrjson/reader.go b/go/arrow/internal/arrjson/reader.go
new file mode 100644
index 0000000..cb95328
--- /dev/null
+++ b/go/arrow/internal/arrjson/reader.go
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+	"encoding/json"
+	"io"
+	"sync/atomic"
+
+	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/arrio"
+	"github.com/apache/arrow/go/arrow/internal/debug"
+)
+
+type Reader struct {
+	refs int64
+
+	schema *arrow.Schema
+	recs   []array.Record
+
+	irec int // current record index. used for the arrio.Reader interface.
+}
+
+func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
+	dec := json.NewDecoder(r)
+	dec.UseNumber()
+	var raw struct {
+		Schema  Schema   `json:"schema"`
+		Records []Record `json:"batches"`
+	}
+	err := dec.Decode(&raw)
+	if err != nil {
+		return nil, err
+	}
+
+	cfg := newConfig()
+	for _, opt := range opts {
+		opt(cfg)
+	}
+
+	schema := schemaFromJSON(raw.Schema)
+	rr := &Reader{
+		refs:   1,
+		schema: schema,
+		recs:   recordsFromJSON(cfg.alloc, schema, raw.Records),
+	}
+	return rr, nil
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (r *Reader) Retain() {
+	atomic.AddInt64(&r.refs, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (r *Reader) Release() {
+	debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+
+	if atomic.AddInt64(&r.refs, -1) == 0 {
+		for i, rec := range r.recs {
+			if r.recs[i] != nil {
+				rec.Release()
+				r.recs[i] = nil
+			}
+		}
+	}
+}
+func (r *Reader) Schema() *arrow.Schema { return r.schema }
+func (r *Reader) NumRecords() int       { return len(r.recs) }
+
+func (r *Reader) Read() (array.Record, error) {
+	if r.irec == r.NumRecords() {
+		return nil, io.EOF
+	}
+	rec := r.recs[r.irec]
+	r.irec++
+	return rec, nil
+}
+
+var (
+	_ arrio.Reader = (*Reader)(nil)
+)
diff --git a/go/arrow/internal/arrjson/writer.go b/go/arrow/internal/arrjson/writer.go
new file mode 100644
index 0000000..d4f9101
--- /dev/null
+++ b/go/arrow/internal/arrjson/writer.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrjson // import "github.com/apache/arrow/go/arrow/internal/arrjson"
+
+import (
+	"encoding/json"
+	"io"
+
+	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/arrio"
+)
+
+const (
+	jsonIndent    = "  "
+	jsonPrefix    = "  "
+	jsonRecPrefix = "    "
+)
+
+type Writer struct {
+	w io.Writer
+
+	schema *arrow.Schema
+	nrecs  int64
+}
+
+func NewWriter(w io.Writer, schema *arrow.Schema) (*Writer, error) {
+	ww := &Writer{
+		w:      w,
+		schema: schema,
+	}
+	_, err := ww.w.Write([]byte("{\n"))
+	if err != nil {
+		return nil, err
+	}
+
+	err = ww.writeSchema()
+	if err != nil {
+		return nil, err
+	}
+	return ww, nil
+}
+
+func (w *Writer) Write(rec array.Record) error {
+	switch {
+	case w.nrecs == 0:
+		_, err := w.w.Write([]byte(",\n" + jsonPrefix + `"batches": [` + "\n" + jsonRecPrefix))
+		if err != nil {
+			return err
+		}
+	case w.nrecs > 0:
+		_, err := w.w.Write([]byte(",\n"))
+		if err != nil {
+			return err
+		}
+	}
+
+	raw, err := json.MarshalIndent(recordToJSON(rec), jsonRecPrefix, jsonIndent)
+	if err != nil {
+		return err
+	}
+
+	_, err = w.w.Write(raw)
+	if err != nil {
+		return err
+	}
+
+	w.nrecs++
+	return nil
+}
+
+func (w *Writer) writeSchema() error {
+	_, err := w.w.Write([]byte(`  "schema": `))
+	if err != nil {
+		return err
+	}
+	raw, err := json.MarshalIndent(schemaToJSON(w.schema), jsonPrefix, jsonIndent)
+	if err != nil {
+		return err
+	}
+	_, err = w.w.Write(raw)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (w *Writer) Close() error {
+	_, err := w.w.Write([]byte("\n  ]\n}"))
+	return err
+}
+
+var (
+	_ arrio.Writer = (*Writer)(nil)
+)
diff --git a/go/arrow/ipc/cmd/arrow-cat/main.go b/go/arrow/ipc/cmd/arrow-cat/main.go
index e5f75e7..9219b68 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main.go
@@ -86,9 +86,7 @@ func main() {
 }
 
 func processStream(w io.Writer, rin io.Reader) error {
-	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-	defer mem.AssertSize(nil, 0)
-
+	mem := memory.NewGoAllocator()
 	for {
 		r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
 		if err != nil {
@@ -142,8 +140,7 @@ func processFile(w io.Writer, fname string) error {
 		return processStream(w, f)
 	}
 
-	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-	defer mem.AssertSize(nil, 0)
+	mem := memory.NewGoAllocator()
 
 	r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
 	if err != nil {
diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go
index 8f8e382..65a3691 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go
@@ -126,27 +126,36 @@ record 3...
   col[2] "time32s": [-2 (null) (null) 1 2]
   col[3] "time64ns": [-2 (null) (null) 1 2]
   col[4] "time64us": [-2 (null) (null) 1 2]
-  col[5] "timestamp": [0 (null) (null) 3 4]
-  col[6] "date32s": [-2 (null) (null) 1 2]
-  col[7] "date64s": [-2 (null) (null) 1 2]
+  col[5] "timestamp_s": [0 (null) (null) 3 4]
+  col[6] "timestamp_ms": [0 (null) (null) 3 4]
+  col[7] "timestamp_us": [0 (null) (null) 3 4]
+  col[8] "timestamp_ns": [0 (null) (null) 3 4]
+  col[9] "date32s": [-2 (null) (null) 1 2]
+  col[10] "date64s": [-2 (null) (null) 1 2]
 record 2...
   col[0] "float16s": [11 (null) (null) 14 15]
   col[1] "time32ms": [-12 (null) (null) 11 12]
   col[2] "time32s": [-12 (null) (null) 11 12]
   col[3] "time64ns": [-12 (null) (null) 11 12]
   col[4] "time64us": [-12 (null) (null) 11 12]
-  col[5] "timestamp": [10 (null) (null) 13 14]
-  col[6] "date32s": [-12 (null) (null) 11 12]
-  col[7] "date64s": [-12 (null) (null) 11 12]
+  col[5] "timestamp_s": [10 (null) (null) 13 14]
+  col[6] "timestamp_ms": [10 (null) (null) 13 14]
+  col[7] "timestamp_us": [10 (null) (null) 13 14]
+  col[8] "timestamp_ns": [10 (null) (null) 13 14]
+  col[9] "date32s": [-12 (null) (null) 11 12]
+  col[10] "date64s": [-12 (null) (null) 11 12]
 record 3...
   col[0] "float16s": [21 (null) (null) 24 25]
   col[1] "time32ms": [-22 (null) (null) 21 22]
   col[2] "time32s": [-22 (null) (null) 21 22]
   col[3] "time64ns": [-22 (null) (null) 21 22]
   col[4] "time64us": [-22 (null) (null) 21 22]
-  col[5] "timestamp": [20 (null) (null) 23 24]
-  col[6] "date32s": [-22 (null) (null) 21 22]
-  col[7] "date64s": [-22 (null) (null) 21 22]
+  col[5] "timestamp_s": [20 (null) (null) 23 24]
+  col[6] "timestamp_ms": [20 (null) (null) 23 24]
+  col[7] "timestamp_us": [20 (null) (null) 23 24]
+  col[8] "timestamp_ns": [20 (null) (null) 23 24]
+  col[9] "date32s": [-22 (null) (null) 21 22]
+  col[10] "date64s": [-22 (null) (null) 21 22]
 `,
 		},
 		{
@@ -405,27 +414,36 @@ record 3/3...
   col[2] "time32s": [-2 (null) (null) 1 2]
   col[3] "time64ns": [-2 (null) (null) 1 2]
   col[4] "time64us": [-2 (null) (null) 1 2]
-  col[5] "timestamp": [0 (null) (null) 3 4]
-  col[6] "date32s": [-2 (null) (null) 1 2]
-  col[7] "date64s": [-2 (null) (null) 1 2]
+  col[5] "timestamp_s": [0 (null) (null) 3 4]
+  col[6] "timestamp_ms": [0 (null) (null) 3 4]
+  col[7] "timestamp_us": [0 (null) (null) 3 4]
+  col[8] "timestamp_ns": [0 (null) (null) 3 4]
+  col[9] "date32s": [-2 (null) (null) 1 2]
+  col[10] "date64s": [-2 (null) (null) 1 2]
 record 2...
   col[0] "float16s": [11 (null) (null) 14 15]
   col[1] "time32ms": [-12 (null) (null) 11 12]
   col[2] "time32s": [-12 (null) (null) 11 12]
   col[3] "time64ns": [-12 (null) (null) 11 12]
   col[4] "time64us": [-12 (null) (null) 11 12]
-  col[5] "timestamp": [10 (null) (null) 13 14]
-  col[6] "date32s": [-12 (null) (null) 11 12]
-  col[7] "date64s": [-12 (null) (null) 11 12]
+  col[5] "timestamp_s": [10 (null) (null) 13 14]
+  col[6] "timestamp_ms": [10 (null) (null) 13 14]
+  col[7] "timestamp_us": [10 (null) (null) 13 14]
+  col[8] "timestamp_ns": [10 (null) (null) 13 14]
+  col[9] "date32s": [-12 (null) (null) 11 12]
+  col[10] "date64s": [-12 (null) (null) 11 12]
 record 3...
   col[0] "float16s": [21 (null) (null) 24 25]
   col[1] "time32ms": [-22 (null) (null) 21 22]
   col[2] "time32s": [-22 (null) (null) 21 22]
   col[3] "time64ns": [-22 (null) (null) 21 22]
   col[4] "time64us": [-22 (null) (null) 21 22]
-  col[5] "timestamp": [20 (null) (null) 23 24]
-  col[6] "date32s": [-22 (null) (null) 21 22]
-  col[7] "date64s": [-22 (null) (null) 21 22]
+  col[5] "timestamp_s": [20 (null) (null) 23 24]
+  col[6] "timestamp_ms": [20 (null) (null) 23 24]
+  col[7] "timestamp_us": [20 (null) (null) 23 24]
+  col[8] "timestamp_ns": [20 (null) (null) 23 24]
+  col[9] "date32s": [-22 (null) (null) 21 22]
+  col[10] "date64s": [-22 (null) (null) 21 22]
 `,
 		},
 		{
@@ -437,27 +455,36 @@ record 1/3...
   col[2] "time32s": [-2 (null) (null) 1 2]
   col[3] "time64ns": [-2 (null) (null) 1 2]
   col[4] "time64us": [-2 (null) (null) 1 2]
-  col[5] "timestamp": [0 (null) (null) 3 4]
-  col[6] "date32s": [-2 (null) (null) 1 2]
-  col[7] "date64s": [-2 (null) (null) 1 2]
+  col[5] "timestamp_s": [0 (null) (null) 3 4]
+  col[6] "timestamp_ms": [0 (null) (null) 3 4]
+  col[7] "timestamp_us": [0 (null) (null) 3 4]
+  col[8] "timestamp_ns": [0 (null) (null) 3 4]
+  col[9] "date32s": [-2 (null) (null) 1 2]
+  col[10] "date64s": [-2 (null) (null) 1 2]
 record 2/3...
   col[0] "float16s": [11 (null) (null) 14 15]
   col[1] "time32ms": [-12 (null) (null) 11 12]
   col[2] "time32s": [-12 (null) (null) 11 12]
   col[3] "time64ns": [-12 (null) (null) 11 12]
   col[4] "time64us": [-12 (null) (null) 11 12]
-  col[5] "timestamp": [10 (null) (null) 13 14]
-  col[6] "date32s": [-12 (null) (null) 11 12]
-  col[7] "date64s": [-12 (null) (null) 11 12]
+  col[5] "timestamp_s": [10 (null) (null) 13 14]
+  col[6] "timestamp_ms": [10 (null) (null) 13 14]
+  col[7] "timestamp_us": [10 (null) (null) 13 14]
+  col[8] "timestamp_ns": [10 (null) (null) 13 14]
+  col[9] "date32s": [-12 (null) (null) 11 12]
+  col[10] "date64s": [-12 (null) (null) 11 12]
 record 3/3...
   col[0] "float16s": [21 (null) (null) 24 25]
   col[1] "time32ms": [-22 (null) (null) 21 22]
   col[2] "time32s": [-22 (null) (null) 21 22]
   col[3] "time64ns": [-22 (null) (null) 21 22]
   col[4] "time64us": [-22 (null) (null) 21 22]
-  col[5] "timestamp": [20 (null) (null) 23 24]
-  col[6] "date32s": [-22 (null) (null) 21 22]
-  col[7] "date64s": [-22 (null) (null) 21 22]
+  col[5] "timestamp_s": [20 (null) (null) 23 24]
+  col[6] "timestamp_ms": [20 (null) (null) 23 24]
+  col[7] "timestamp_us": [20 (null) (null) 23 24]
+  col[8] "timestamp_ns": [20 (null) (null) 23 24]
+  col[9] "date32s": [-22 (null) (null) 21 22]
+  col[10] "date64s": [-22 (null) (null) 21 22]
 `,
 		},
 		{
diff --git a/go/arrow/ipc/cmd/arrow-file-to-stream/main.go b/go/arrow/ipc/cmd/arrow-file-to-stream/main.go
new file mode 100644
index 0000000..09aadac
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-file-to-stream/main.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-file-to-stream"
+
+import (
+	"flag"
+	"io"
+	"log"
+	"os"
+
+	"github.com/apache/arrow/go/arrow/arrio"
+	"github.com/apache/arrow/go/arrow/ipc"
+	"github.com/apache/arrow/go/arrow/memory"
+	"github.com/pkg/errors"
+)
+
+func main() {
+	log.SetPrefix("arrow-file-to-stream: ")
+	log.SetFlags(0)
+
+	flag.Parse()
+
+	if flag.NArg() != 1 {
+		flag.Usage()
+		log.Fatalf("missing path to input ARROW file")
+	}
+
+	err := processFile(os.Stdout, flag.Arg(0))
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+func processFile(w io.Writer, fname string) error {
+	r, err := os.Open(fname)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer r.Close()
+
+	mem := memory.NewGoAllocator()
+
+	rr, err := ipc.NewFileReader(r, ipc.WithAllocator(mem))
+	if err != nil {
+		if errors.Cause(err) == io.EOF {
+			return nil
+		}
+		return err
+	}
+	defer rr.Close()
+
+	ww := ipc.NewWriter(w, ipc.WithAllocator(mem), ipc.WithSchema(rr.Schema()))
+	defer ww.Close()
+
+	n, err := arrio.Copy(ww, rr)
+	if err != nil {
+		return errors.Wrap(err, "could not copy ARROW stream")
+	}
+	if got, want := n, int64(rr.NumRecords()); got != want {
+		return errors.Errorf("invalid number of records written (got=%d, want=%d)", got, want)
+	}
+
+	err = ww.Close()
+	if err != nil {
+		return errors.Wrap(err, "could not close output ARROW stream")
+	}
+
+	return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
similarity index 57%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
index 64ea6cc..e32bf48 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-file-to-stream/main_test.go
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-file-to-stream"
 
 import (
 	"io"
@@ -23,11 +23,10 @@ import (
 	"testing"
 
 	"github.com/apache/arrow/go/arrow/internal/arrdata"
-	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
-func TestStream(t *testing.T) {
+func TestFileToStream(t *testing.T) {
 	for name, recs := range arrdata.Records {
 		t.Run(name, func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -40,53 +39,30 @@ func TestStream(t *testing.T) {
 			defer f.Close()
 			defer os.Remove(f.Name())
 
-			{
-				w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				defer w.Close()
+			arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
 
-				for i, rec := range recs {
-					err = w.Write(rec)
-					if err != nil {
-						t.Fatalf("could not write record[%d]: %v", i, err)
-					}
-				}
-
-				err = w.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
+			o, err := ioutil.TempFile("", "arrow-ipc-")
+			if err != nil {
+				t.Fatal(err)
 			}
+			defer os.Remove(o.Name())
 
-			err = f.Sync()
+			err = processFile(o, f.Name())
 			if err != nil {
-				t.Fatalf("could not sync data to disk: %v", err)
+				t.Fatal(err)
 			}
 
-			_, err = f.Seek(0, io.SeekStart)
+			err = o.Sync()
 			if err != nil {
-				t.Fatalf("could not seek to start: %v", err)
+				t.Fatal(err)
 			}
 
-			{
-				r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer r.Release()
-
-				n := 0
-				for r.Next() {
-					rec := r.Record()
-					if !cmpRecs(rec, recs[n]) {
-						t.Fatalf("records[%d] differ", n)
-					}
-					n++
-				}
-
-				if len(recs) != n {
-					t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
-				}
+			_, err = o.Seek(0, io.SeekStart)
+			if err != nil {
+				t.Fatal(err)
 			}
+
+			arrdata.CheckArrowStream(t, o, mem, recs[0].Schema(), recs)
 		})
 	}
 }
diff --git a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
new file mode 100644
index 0000000..fe6d668
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-json-integration-test"
+
+import (
+	"flag"
+	"log"
+	"os"
+
+	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/arrio"
+	"github.com/apache/arrow/go/arrow/internal/arrjson"
+	"github.com/apache/arrow/go/arrow/ipc"
+	"github.com/pkg/errors"
+)
+
+func main() {
+	log.SetPrefix("arrow-json: ")
+	log.SetFlags(0)
+
+	var (
+		arrowPath = flag.String("arrow", "", "path to ARROW file")
+		jsonPath  = flag.String("json", "", "path to JSON file")
+		mode      = flag.String("mode", "VALIDATE", "mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)")
+		verbose   = flag.Bool("verbose", true, "enable/disable verbose mode")
+	)
+
+	flag.Parse()
+
+	err := runCommand(*jsonPath, *arrowPath, *mode, *verbose)
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+func runCommand(jsonName, arrowName, mode string, verbose bool) error {
+	if jsonName == "" {
+		return errors.Errorf("must specify json file name")
+	}
+
+	if arrowName == "" {
+		return errors.Errorf("must specify arrow file name")
+	}
+
+	switch mode {
+	case "ARROW_TO_JSON":
+		return cnvToJSON(arrowName, jsonName, verbose)
+	case "JSON_TO_ARROW":
+		return cnvToARROW(arrowName, jsonName, verbose)
+	case "VALIDATE":
+		return validate(arrowName, jsonName, verbose)
+	default:
+		return errors.Errorf("unknown command %q", mode)
+	}
+
+	return nil
+}
+
+func cnvToJSON(arrowName, jsonName string, verbose bool) error {
+	r, err := os.Open(arrowName)
+	if err != nil {
+		return errors.Wrapf(err, "could not open ARROW file %q", arrowName)
+	}
+	defer r.Close()
+
+	w, err := os.Create(jsonName)
+	if err != nil {
+		return errors.Wrapf(err, "could not create JSON file %q", jsonName)
+	}
+	defer w.Close()
+
+	rr, err := ipc.NewFileReader(r)
+	if err != nil {
+		return errors.Wrapf(err, "could not open ARROW file reader from file %q", arrowName)
+	}
+	defer rr.Close()
+
+	if verbose {
+		log.Printf("found schema:\n%v\n", rr.Schema())
+	}
+
+	ww, err := arrjson.NewWriter(w, rr.Schema())
+	if err != nil {
+		return errors.Wrap(err, "could not create JSON encoder")
+	}
+	defer ww.Close()
+
+	n, err := arrio.Copy(ww, rr)
+	if err != nil {
+		return errors.Wrap(err, "could not convert ARROW file reader data to JSON data")
+	}
+
+	if got, want := n, int64(rr.NumRecords()); got != want {
+		return errors.Errorf("invalid number of records copied (got=%d, want=%d", got, want)
+	}
+
+	err = ww.Close()
+	if err != nil {
+		return errors.Wrapf(err, "could not close JSON encoder %q", jsonName)
+	}
+
+	err = w.Close()
+	if err != nil {
+		return errors.Wrapf(err, "could not close JSON file %q", jsonName)
+	}
+
+	return nil
+}
+
+func cnvToARROW(arrowName, jsonName string, verbose bool) error {
+	r, err := os.Open(jsonName)
+	if err != nil {
+		return errors.Wrapf(err, "could not open JSON file %q", jsonName)
+	}
+	defer r.Close()
+
+	w, err := os.Create(arrowName)
+	if err != nil {
+		return errors.Wrapf(err, "could not create ARROW file %q", arrowName)
+	}
+	defer w.Close()
+
+	rr, err := arrjson.NewReader(r)
+	if err != nil {
+		return errors.Wrapf(err, "could not open JSON file reader from file %q", jsonName)
+	}
+
+	if verbose {
+		log.Printf("found schema:\n%v\n", rr.Schema())
+	}
+
+	ww, err := ipc.NewFileWriter(w, ipc.WithSchema(rr.Schema()))
+	if err != nil {
+		return errors.Wrap(err, "could not create ARROW file writer")
+	}
+	defer ww.Close()
+
+	n, err := arrio.Copy(ww, rr)
+	if err != nil {
+		return errors.Wrap(err, "could not convert JSON data to ARROW data")
+	}
+
+	if got, want := n, int64(rr.NumRecords()); got != want {
+		return errors.Errorf("invalid number of records copied (got=%d, want=%d", got, want)
+	}
+
+	err = ww.Close()
+	if err != nil {
+		return errors.Wrapf(err, "could not close ARROW file writer %q", arrowName)
+	}
+
+	err = w.Close()
+	if err != nil {
+		return errors.Wrapf(err, "could not close ARROW file %q", arrowName)
+	}
+
+	return nil
+}
+
+func validate(arrowName, jsonName string, verbose bool) error {
+	jr, err := os.Open(jsonName)
+	if err != nil {
+		return errors.Wrapf(err, "could not open JSON file %q", jsonName)
+	}
+	defer jr.Close()
+
+	jrr, err := arrjson.NewReader(jr)
+	if err != nil {
+		return errors.Wrapf(err, "could not open JSON file reader from file %q", jsonName)
+	}
+
+	ar, err := os.Open(arrowName)
+	if err != nil {
+		return errors.Wrapf(err, "could not open ARROW file %q", arrowName)
+	}
+	defer ar.Close()
+
+	arr, err := ipc.NewFileReader(ar)
+	if err != nil {
+		return errors.Wrapf(err, "could not open ARROW file reader from file %q", arrowName)
+	}
+	defer arr.Close()
+
+	if !arr.Schema().Equal(jrr.Schema()) {
+		if verbose {
+			log.Printf("JSON schema:\n%v\nArrow schema:\n%v\n", arr.Schema(), jrr.Schema())
+		}
+		return errors.Errorf("schemas did not match")
+	}
+
+	for i := 0; i < arr.NumRecords(); i++ {
+		arec, err := arr.Read()
+		if err != nil {
+			return errors.Wrapf(err, "could not read record %d from ARROW file", i)
+		}
+		jrec, err := jrr.Read()
+		if err != nil {
+			return errors.Wrapf(err, "could not read record %d from JSON file", i)
+		}
+		if !array.RecordApproxEqual(jrec, arec) {
+			return errors.Errorf("record batch %d did not match\nJSON:\n%v\nARROW:\n%v\n",
+				i, jrec, arec,
+			)
+		}
+	}
+
+	if jn, an := jrr.NumRecords(), arr.NumRecords(); jn != an {
+		return errors.Errorf("different number of record batches: %d (JSON) vs %d (Arrow)", jn, an)
+	}
+
+	return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
similarity index 51%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
index 64ea6cc..a7604a8 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main_test.go
@@ -14,79 +14,75 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-json-integration-test"
 
 import (
-	"io"
 	"io/ioutil"
 	"os"
 	"testing"
 
 	"github.com/apache/arrow/go/arrow/internal/arrdata"
-	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
-func TestStream(t *testing.T) {
+func TestIntegration(t *testing.T) {
+	const verbose = true
 	for name, recs := range arrdata.Records {
 		t.Run(name, func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
 			defer mem.AssertSize(t, 0)
 
-			f, err := ioutil.TempFile("", "arrow-ipc-")
+			af1, err := ioutil.TempFile("", "arrow-json-integration-")
 			if err != nil {
 				t.Fatal(err)
 			}
-			defer f.Close()
-			defer os.Remove(f.Name())
+			defer af1.Close()
+			defer os.RemoveAll(af1.Name())
 
-			{
-				w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				defer w.Close()
+			arrdata.WriteFile(t, af1, mem, recs[0].Schema(), recs)
+			arrdata.CheckArrowFile(t, af1, mem, recs[0].Schema(), recs)
 
-				for i, rec := range recs {
-					err = w.Write(rec)
-					if err != nil {
-						t.Fatalf("could not write record[%d]: %v", i, err)
-					}
-				}
+			aj, err := ioutil.TempFile("", "arrow-json-integration-")
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer aj.Close()
+			defer os.RemoveAll(aj.Name())
 
-				err = w.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
+			err = cnvToJSON(af1.Name(), aj.Name(), verbose)
+			if err != nil {
+				t.Fatal(err)
 			}
 
-			err = f.Sync()
+			err = validate(af1.Name(), aj.Name(), verbose)
 			if err != nil {
-				t.Fatalf("could not sync data to disk: %v", err)
+				t.Fatal(err)
 			}
 
-			_, err = f.Seek(0, io.SeekStart)
+			af2, err := ioutil.TempFile("", "arrow-json-integration-")
 			if err != nil {
-				t.Fatalf("could not seek to start: %v", err)
+				t.Fatal(err)
 			}
+			af2.Close()
+			os.RemoveAll(af2.Name())
 
-			{
-				r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer r.Release()
+			err = cnvToARROW(af2.Name(), aj.Name(), verbose)
+			if err != nil {
+				t.Fatal(err)
+			}
 
-				n := 0
-				for r.Next() {
-					rec := r.Record()
-					if !cmpRecs(rec, recs[n]) {
-						t.Fatalf("records[%d] differ", n)
-					}
-					n++
-				}
+			err = validate(af2.Name(), aj.Name(), verbose)
+			if err != nil {
+				t.Fatal(err)
+			}
 
-				if len(recs) != n {
-					t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
-				}
+			af2, err = os.Open(af2.Name())
+			if err != nil {
+				t.Fatal(err)
 			}
+			defer af2.Close()
+
+			arrdata.CheckArrowFile(t, af2, mem, recs[0].Schema(), recs)
 		})
 	}
 }
diff --git a/go/arrow/ipc/cmd/arrow-ls/main.go b/go/arrow/ipc/cmd/arrow-ls/main.go
index 43460f9..a0f235f 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main.go
@@ -84,8 +84,7 @@ func main() {
 }
 
 func processStream(w io.Writer, rin io.Reader) error {
-	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-	defer mem.AssertSize(nil, 0)
+	mem := memory.NewGoAllocator()
 
 	for {
 		r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
@@ -138,8 +137,7 @@ func processFile(w io.Writer, fname string) error {
 		return processStream(w, f)
 	}
 
-	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-	defer mem.AssertSize(nil, 0)
+	mem := memory.NewGoAllocator()
 
 	r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
 	if err != nil {
diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go
index 0488eae..71db468 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go
@@ -90,13 +90,16 @@ records: 3
 		{
 			name: "fixed_width_types",
 			want: `schema:
-  fields: 8
+  fields: 11
     - float16s: type=float16, nullable
     - time32ms: type=time32[ms], nullable
     - time32s: type=time32[s], nullable
     - time64ns: type=time64[ns], nullable
     - time64us: type=time64[us], nullable
-    - timestamp: type=timestamp[ns], nullable
+    - timestamp_s: type=timestamp[s, tz=UTC], nullable
+    - timestamp_ms: type=timestamp[ms, tz=UTC], nullable
+    - timestamp_us: type=timestamp[us, tz=UTC], nullable
+    - timestamp_ns: type=timestamp[ns, tz=UTC], nullable
     - date32s: type=date32, nullable
     - date64s: type=date64, nullable
 records: 3
diff --git a/go/arrow/ipc/cmd/arrow-stream-to-file/main.go b/go/arrow/ipc/cmd/arrow-stream-to-file/main.go
new file mode 100644
index 0000000..3dc1ec6
--- /dev/null
+++ b/go/arrow/ipc/cmd/arrow-stream-to-file/main.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-stream-to-file"
+
+import (
+	"flag"
+	"io"
+	"log"
+	"os"
+
+	"github.com/apache/arrow/go/arrow/arrio"
+	"github.com/apache/arrow/go/arrow/ipc"
+	"github.com/apache/arrow/go/arrow/memory"
+	"github.com/pkg/errors"
+)
+
+func main() {
+	log.SetPrefix("arrow-stream-to-file: ")
+	log.SetFlags(0)
+
+	flag.Parse()
+
+	err := processStream(os.Stdout, os.Stdin)
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+func processStream(w *os.File, r io.Reader) error {
+	mem := memory.NewGoAllocator()
+
+	rr, err := ipc.NewReader(r, ipc.WithAllocator(mem))
+	if err != nil {
+		if errors.Cause(err) == io.EOF {
+			return nil
+		}
+		return err
+	}
+
+	ww, err := ipc.NewFileWriter(w, ipc.WithAllocator(mem), ipc.WithSchema(rr.Schema()))
+	if err != nil {
+		return errors.Wrap(err, "could not create ARROW file writer")
+	}
+	defer ww.Close()
+
+	_, err = arrio.Copy(ww, rr)
+	if err != nil {
+		return errors.Wrap(err, "could not copy ARROW stream")
+	}
+
+	err = ww.Close()
+	if err != nil {
+		return errors.Wrap(err, "could not close output ARROW file")
+	}
+
+	return nil
+}
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
similarity index 59%
copy from go/arrow/ipc/stream_test.go
copy to go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
index 64ea6cc..fc7d995 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/cmd/arrow-stream-to-file/main_test.go
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package ipc_test
+package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-stream-to-file"
 
 import (
 	"io"
@@ -23,11 +23,10 @@ import (
 	"testing"
 
 	"github.com/apache/arrow/go/arrow/internal/arrdata"
-	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
-func TestStream(t *testing.T) {
+func TestStreamToFile(t *testing.T) {
 	for name, recs := range arrdata.Records {
 		t.Run(name, func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -40,53 +39,40 @@ func TestStream(t *testing.T) {
 			defer f.Close()
 			defer os.Remove(f.Name())
 
-			{
-				w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				defer w.Close()
-
-				for i, rec := range recs {
-					err = w.Write(rec)
-					if err != nil {
-						t.Fatalf("could not write record[%d]: %v", i, err)
-					}
-				}
-
-				err = w.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
-			}
+			arrdata.WriteStream(t, f, mem, recs[0].Schema(), recs)
 
 			err = f.Sync()
 			if err != nil {
-				t.Fatalf("could not sync data to disk: %v", err)
+				t.Fatal(err)
 			}
 
 			_, err = f.Seek(0, io.SeekStart)
 			if err != nil {
-				t.Fatalf("could not seek to start: %v", err)
+				t.Fatal(err)
+			}
+
+			o, err := ioutil.TempFile("", "arrow-ipc-")
+			if err != nil {
+				t.Fatal(err)
 			}
+			defer os.Remove(o.Name())
 
-			{
-				r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer r.Release()
+			err = processStream(o, f)
+			if err != nil {
+				t.Fatal(err)
+			}
 
-				n := 0
-				for r.Next() {
-					rec := r.Record()
-					if !cmpRecs(rec, recs[n]) {
-						t.Fatalf("records[%d] differ", n)
-					}
-					n++
-				}
+			err = o.Sync()
+			if err != nil {
+				t.Fatal(err)
+			}
 
-				if len(recs) != n {
-					t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
-				}
+			_, err = o.Seek(0, io.SeekStart)
+			if err != nil {
+				t.Fatal(err)
 			}
+
+			arrdata.CheckArrowFile(t, o, mem, recs[0].Schema(), recs)
 		})
 	}
 }
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 9f12384..5de7efb 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -45,6 +45,9 @@ type FileReader struct {
 
 	schema *arrow.Schema
 	record array.Record
+
+	irec int   // current record index. used for the arrio.Reader interface
+	err  error // last error
 }
 
 // NewFileReader opens an Arrow file using the provided reader r.
@@ -277,6 +280,25 @@ func (f *FileReader) Record(i int) (array.Record, error) {
 	return f.record, nil
 }
 
+// Read reads the current record from the underlying stream and an error, if any.
+// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+//
+// The returned record value is valid until the next call to Read.
+// Users need to call Retain on that Record to keep it valid for longer.
+func (f *FileReader) Read() (rec array.Record, err error) {
+	if f.irec == f.NumRecords() {
+		return nil, io.EOF
+	}
+	rec, f.err = f.Record(f.irec)
+	f.irec++
+	return rec, f.err
+}
+
+// ReadAt reads the i-th record from the underlying stream and an error, if any.
+func (f *FileReader) ReadAt(i int64) (array.Record, error) {
+	return f.Record(int(i))
+}
+
 func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
 	var (
 		msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
@@ -475,8 +497,7 @@ func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
 }
 
 func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
-	field, buffers := ctx.loadCommon(2)
-	buffers = append(buffers, ctx.buffer())
+	field, buffers := ctx.loadCommon(1)
 
 	sub := ctx.loadChild(dt.Elem())
 	defer sub.Release()
diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go
index 136b6a7..98e1bde 100644
--- a/go/arrow/ipc/file_test.go
+++ b/go/arrow/ipc/file_test.go
@@ -17,13 +17,11 @@
 package ipc_test
 
 import (
-	"io"
 	"io/ioutil"
 	"os"
 	"testing"
 
 	"github.com/apache/arrow/go/arrow/internal/arrdata"
-	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
@@ -40,62 +38,8 @@ func TestFile(t *testing.T) {
 			defer f.Close()
 			defer os.Remove(f.Name())
 
-			{
-				w, err := ipc.NewFileWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer w.Close()
-
-				for i, rec := range recs {
-					err = w.Write(rec)
-					if err != nil {
-						t.Fatalf("could not write record[%d]: %v", i, err)
-					}
-				}
-
-				err = w.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
-
-				err = f.Sync()
-				if err != nil {
-					t.Fatalf("could not sync data to disk: %v", err)
-				}
-
-				_, err = f.Seek(0, io.SeekStart)
-				if err != nil {
-					t.Fatalf("could not seek to start: %v", err)
-				}
-			}
-
-			{
-				r, err := ipc.NewFileReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer r.Close()
-
-				if got, want := r.NumRecords(), len(recs); got != want {
-					t.Fatalf("invalid number of records. got=%d, want=%d", got, want)
-				}
-
-				for i := 0; i < r.NumRecords(); i++ {
-					rec, err := r.Record(i)
-					if err != nil {
-						t.Fatalf("could not read record %d: %v", i, err)
-					}
-					if !cmpRecs(rec, recs[i]) {
-						t.Fatalf("records[%d] differ", i)
-					}
-				}
-
-				err = r.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
-			}
+			arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
+			arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(), recs)
 		})
 	}
 }
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 470baf2..e688974 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -20,6 +20,7 @@ import (
 	"io"
 
 	"github.com/apache/arrow/go/arrow"
+	"github.com/apache/arrow/go/arrow/arrio"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
@@ -101,3 +102,12 @@ func WithSchema(schema *arrow.Schema) Option {
 		cfg.schema = schema
 	}
 }
+
+var (
+	_ arrio.Reader = (*Reader)(nil)
+	_ arrio.Writer = (*Writer)(nil)
+	_ arrio.Reader = (*FileReader)(nil)
+	_ arrio.Writer = (*FileWriter)(nil)
+
+	_ arrio.ReaderAt = (*FileReader)(nil)
+)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
deleted file mode 100644
index 01be713..0000000
--- a/go/arrow/ipc/ipc_test.go
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package ipc_test
-
-import (
-	"fmt"
-	"io"
-	"strings"
-
-	"github.com/apache/arrow/go/arrow/array"
-)
-
-func cmpRecs(r1, r2 array.Record) bool {
-	// FIXME(sbinet): impl+use arrow.Record.Equal ?
-
-	if !r1.Schema().Equal(r2.Schema()) {
-		return false
-	}
-	if r1.NumCols() != r2.NumCols() {
-		return false
-	}
-	if r1.NumRows() != r2.NumRows() {
-		return false
-	}
-
-	var (
-		txt1 = new(strings.Builder)
-		txt2 = new(strings.Builder)
-	)
-
-	printRec(txt1, r1)
-	printRec(txt2, r2)
-
-	return txt1.String() == txt2.String()
-}
-
-func printRec(w io.Writer, rec array.Record) {
-	for i, col := range rec.Columns() {
-		fmt.Fprintf(w, "  col[%d] %q: %v\n", i, rec.ColumnName(i), col)
-	}
-}
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 794a643..7b4e3da 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -201,7 +201,8 @@ type fieldVisitor struct {
 	meta   map[string]string
 }
 
-func (fv *fieldVisitor) visit(dt arrow.DataType) {
+func (fv *fieldVisitor) visit(field arrow.Field) {
+	dt := field.Type
 	switch dt := dt.(type) {
 	case *arrow.NullType:
 		fv.dtype = flatbuf.TypeNull
@@ -309,7 +310,10 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
 	case *arrow.TimestampType:
 		fv.dtype = flatbuf.TypeTimestamp
 		unit := unitToFB(dt.Unit)
-		tz := fv.b.CreateString(dt.TimeZone)
+		var tz flatbuffers.UOffsetT
+		if dt.TimeZone != "" {
+			tz = fv.b.CreateString(dt.TimeZone)
+		}
 		flatbuf.TimestampStart(fv.b)
 		flatbuf.TimestampAddUnit(fv.b, unit)
 		flatbuf.TimestampAddTimezone(fv.b, tz)
@@ -330,13 +334,13 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
 
 	case *arrow.ListType:
 		fv.dtype = flatbuf.TypeList
-		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem()}, fv.memo))
+		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
 		flatbuf.ListStart(fv.b)
 		fv.offset = flatbuf.ListEnd(fv.b)
 
 	case *arrow.FixedSizeListType:
 		fv.dtype = flatbuf.TypeFixedSizeList
-		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem()}, fv.memo))
+		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
 		flatbuf.FixedSizeListStart(fv.b)
 		flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
 		fv.offset = flatbuf.FixedSizeListEnd(fv.b)
@@ -369,7 +373,7 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
 func (fv *fieldVisitor) result(field arrow.Field) flatbuffers.UOffsetT {
 	nameFB := fv.b.CreateString(field.Name)
 
-	fv.visit(field.Type)
+	fv.visit(field)
 
 	flatbuf.FieldStartChildrenVector(fv.b, len(fv.kids))
 	for i := len(fv.kids) - 1; i >= 0; i-- {
diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go
index 919c7a5..627e67a 100644
--- a/go/arrow/ipc/reader.go
+++ b/go/arrow/ipc/reader.go
@@ -179,6 +179,24 @@ func (r *Reader) Record() array.Record {
 	return r.rec
 }
 
+// Read reads the current record from the underlying stream and an error, if any.
+// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
+func (r *Reader) Read() (array.Record, error) {
+	if r.rec != nil {
+		r.rec.Release()
+		r.rec = nil
+	}
+
+	if !r.next() {
+		if r.done {
+			return nil, io.EOF
+		}
+		return nil, r.err
+	}
+
+	return r.rec, nil
+}
+
 var (
 	_ array.RecordReader = (*Reader)(nil)
 )
diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/stream_test.go
index 64ea6cc..65d2c72 100644
--- a/go/arrow/ipc/stream_test.go
+++ b/go/arrow/ipc/stream_test.go
@@ -23,7 +23,6 @@ import (
 	"testing"
 
 	"github.com/apache/arrow/go/arrow/internal/arrdata"
-	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
 )
 
@@ -40,22 +39,7 @@ func TestStream(t *testing.T) {
 			defer f.Close()
 			defer os.Remove(f.Name())
 
-			{
-				w := ipc.NewWriter(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				defer w.Close()
-
-				for i, rec := range recs {
-					err = w.Write(rec)
-					if err != nil {
-						t.Fatalf("could not write record[%d]: %v", i, err)
-					}
-				}
-
-				err = w.Close()
-				if err != nil {
-					t.Fatal(err)
-				}
-			}
+			arrdata.WriteStream(t, f, mem, recs[0].Schema(), recs)
 
 			err = f.Sync()
 			if err != nil {
@@ -67,26 +51,7 @@ func TestStream(t *testing.T) {
 				t.Fatalf("could not seek to start: %v", err)
 			}
 
-			{
-				r, err := ipc.NewReader(f, ipc.WithSchema(recs[0].Schema()), ipc.WithAllocator(mem))
-				if err != nil {
-					t.Fatal(err)
-				}
-				defer r.Release()
-
-				n := 0
-				for r.Next() {
-					rec := r.Record()
-					if !cmpRecs(rec, recs[n]) {
-						t.Fatalf("records[%d] differ", n)
-					}
-					n++
-				}
-
-				if len(recs) != n {
-					t.Fatalf("invalid number of records. got=%d, want=%d", n, len(recs))
-				}
-			}
+			arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs)
 		})
 	}
 }
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e526b65..3561598 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -76,6 +76,13 @@ func NewWriter(w io.Writer, opts ...Option) *Writer {
 }
 
 func (w *Writer) Close() error {
+	if !w.started {
+		err := w.start()
+		if err != nil {
+			return err
+		}
+	}
+
 	if w.pw == nil {
 		return nil
 	}
@@ -272,7 +279,14 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 
 		switch {
 		case needTruncate(int64(data.Offset()), values, totalDataBytes):
-			panic("not implemented") // FIXME(sbinet) writer.cc:264
+			// slice data buffer to include the range we need now.
+			var (
+				beg = int64(arr.ValueOffset(0))
+				len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(data.Len())-beg)
+			)
+			data = array.NewSliceData(data, beg, beg+len)
+			defer data.Release()
+			values = data.Buffers()[2]
 		default:
 			if values != nil {
 				values.Retain()
@@ -297,7 +311,14 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 
 		switch {
 		case needTruncate(int64(data.Offset()), values, totalDataBytes):
-			panic("not implemented") // FIXME(sbinet) writer.cc:264
+			// slice data buffer to include the range we need now.
+			var (
+				beg = int64(arr.ValueOffset(0))
+				len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(data.Len())-beg)
+			)
+			data = array.NewSliceData(data, beg, beg+len)
+			defer data.Release()
+			values = data.Buffers()[2]
 		default:
 			if values != nil {
 				values.Retain()
@@ -357,36 +378,17 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 
 	case *arrow.FixedSizeListType:
 		arr := arr.(*array.FixedSizeList)
-		voffsets, err := w.getZeroBasedValueOffsets(arr)
-		if err != nil {
-			return errors.Wrapf(err, "could not retrieve zero-based value offsets for array %T", arr)
-		}
-		p.body = append(p.body, voffsets)
 
 		w.depth--
-		var (
-			values        = arr.ListValues()
-			mustRelease   = false
-			values_offset int64
-			values_length int64
-		)
-		defer func() {
-			if mustRelease {
-				values.Release()
-			}
-		}()
 
-		if voffsets != nil {
-			values_offset = int64(arr.Offsets()[0])
-			values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
-		}
+		size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
+		beg := int64(arr.Offset()) * size
+		end := int64(arr.Offset()+arr.Len()) * size
 
-		if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
-			// must also slice the values
-			values = array.NewSlice(values, values_offset, values_length)
-			mustRelease = true
-		}
-		err = w.visit(p, values)
+		values := array.NewSlice(arr.ListValues(), beg, end)
+		defer values.Release()
+
+		err := w.visit(p, values)
 
 		if err != nil {
 			return errors.Wrapf(err, "could not visit list element for array %T", arr)
@@ -407,6 +409,9 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr array.Interface) (*memory.B
 		// FIXME(sbinet): writer.cc:231
 		panic(fmt.Errorf("not implemented offset=%d", data.Offset()))
 	}
+	if voffsets == nil || voffsets.Len() == 0 {
+		return nil, nil
+	}
 
 	voffsets.Retain()
 	return voffsets, nil
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 7b8e562..a4763c9 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -1234,6 +1234,24 @@ class IntegrationRunner(object):
                       ' for JS once JS supports them')
                 continue
 
+            if ('Go' in (producer.name, consumer.name) and
+                    "decimal" in test_case.name):
+                print('TODO(ARROW-3676): Enable decimal tests ' +
+                      ' for Go')
+                continue
+
+            if ('Go' in (producer.name, consumer.name) and
+                    "map" in test_case.name):
+                print('TODO(ARROW-3679): Enable map tests ' +
+                      ' for Go')
+                continue
+
+            if ('Go' in (producer.name, consumer.name) and
+                    "dictionary" in test_case.name):
+                print('TODO(ARROW-3039): Enable dictionary tests ' +
+                      ' for Go')
+                continue
+
             # Make the random access file
             producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
                                               name + '.json_as_file')
@@ -1586,6 +1604,57 @@ class JSTester(Tester):
         os.system(cmd)
 
 
+class GoTester(Tester):
+    PRODUCER = True
+    CONSUMER = True
+
+    # FIXME(sbinet): revisit for Go modules
+    GOPATH = os.getenv('GOPATH', '~/go')
+    GOBIN = os.environ.get('GOBIN', os.path.join(GOPATH, 'bin'))
+
+    GO_INTEGRATION_EXE = os.path.join(GOBIN, 'arrow-json-integration-test')
+    STREAM_TO_FILE = os.path.join(GOBIN, 'arrow-stream-to-file')
+    FILE_TO_STREAM = os.path.join(GOBIN, 'arrow-file-to-stream')
+
+    name = 'Go'
+
+    def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
+        cmd = [self.GO_INTEGRATION_EXE]
+
+        if arrow_path is not None:
+            cmd.extend(['-arrow', arrow_path])
+
+        if json_path is not None:
+            cmd.extend(['-json', json_path])
+
+        cmd.extend(['-mode', command])
+
+        if self.debug:
+            print(' '.join(cmd))
+
+        run_cmd(cmd)
+
+    def validate(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'VALIDATE')
+
+    def json_to_file(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+
+    def stream_to_file(self, stream_path, file_path):
+        cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
+        cmd = ' '.join(cmd)
+        if self.debug:
+            print(cmd)
+        os.system(cmd)
+
+    def file_to_stream(self, file_path, stream_path):
+        cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
+        cmd = ' '.join(cmd)
+        if self.debug:
+            print(cmd)
+        os.system(cmd)
+
+
 def get_static_json_files():
     glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
     return [JsonFile(name=os.path.basename(p), path=p, skip=set(),
@@ -1605,6 +1674,9 @@ def run_all_tests(args):
     if args.enable_js:
         testers.append(JSTester(args))
 
+    if args.enable_go:
+        testers.append(GoTester(args))
+
     static_json_files = get_static_json_files()
     generated_json_files = get_generated_json_files(tempdir=args.tempdir,
                                                     flight=args.run_flight)
@@ -1661,6 +1733,9 @@ if __name__ == '__main__':
     parser.add_argument('--enable-js', dest='enable_js',
                         action='store', type=int, default=1,
                         help='Include JavaScript in integration tests')
+    parser.add_argument('--enable-go', dest='enable_go',
+                        action='store', type=int, default=1,
+                        help='Include Go in integration tests')
 
     parser.add_argument('--write_generated_json', dest='generated_json_path',
                         action='store', default=False,