You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2021/09/30 16:50:26 UTC

[arrow] branch master updated: ARROW-14061: [Go][C++] Add Cgo Arrow Memory Pool Allocator

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8568942  ARROW-14061: [Go][C++] Add Cgo Arrow Memory Pool Allocator
8568942 is described below

commit 8568942859da725fd982a8d3f8958270764fbf79
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Thu Sep 30 12:49:33 2021 -0400

    ARROW-14061: [Go][C++] Add Cgo Arrow Memory Pool Allocator
    
    Continuing with the idea of exposing the Compute APIs within the Go implementation via CGO, in order to ensure safer memory handling there should be an allocator implementation which uses CGO in order to allocate memory via the C++ memory pool along with utilities for tracking memory leaks.
    
    Closes #11206 from zeroshade/arrow-14061
    
    Lead-authored-by: Matthew Topol <mt...@factset.com>
    Co-authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 .github/workflows/go.yml                           | 122 +++++++++++++++++++++
 .../debian-go-cgo.dockerfile}                      |  34 +++---
 ci/scripts/go_build.sh                             |   6 +-
 ci/scripts/go_test.sh                              |   8 +-
 ci/scripts/msys2_setup.sh                          |   7 ++
 docker-compose.yml                                 |  19 ++++
 go/arrow/cdata/cdata.go                            |  33 +++---
 go/arrow/memory/cgo_allocator.go                   | 108 ++++++++++++++++++
 go/arrow/memory/cgo_allocator_defaults.go          |  23 ++++
 go/arrow/memory/cgo_allocator_logging.go           |  23 ++++
 go/arrow/memory/cgo_allocator_test.go              |  82 ++++++++++++++
 go/arrow/memory/checked_allocator.go               |  90 ++++++++++++++-
 go/arrow/memory/internal/cgoalloc/allocator.cc     |  71 ++++++++++++
 go/arrow/memory/internal/cgoalloc/allocator.go     | 107 ++++++++++++++++++
 go/arrow/memory/internal/cgoalloc/allocator.h      |  39 +++++++
 go/arrow/memory/internal/cgoalloc/helpers.h        |  52 +++++++++
 16 files changed, 780 insertions(+), 44 deletions(-)

diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 4f86f83..5d8034f 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -75,6 +75,40 @@ jobs:
         continue-on-error: true
         run: archery docker push debian-go
 
+  docker_cgo:
+    name: AMD64 Debian 11 GO ${{ matrix.go }} - CGO
+    runs-on: ubuntu-latest
+    if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
+    timeout-minutes: 15
+    strategy:
+      fail-fast: false
+      matrix:
+        go: [1.15]
+    env:
+      GO: ${{ matrix.go }}
+    steps:
+      - name: Checkout Arrow
+        uses: actions/checkout@v2
+        with:
+          fetch-depth: 0
+      - name: Fetch Submodules and Tags
+        run: ci/scripts/util_checkout.sh
+      - name: Free Up Disk Space
+        run: ci/scripts/util_cleanup.sh
+      - name: Setup Python
+        uses: actions/setup-python@v1
+        with:
+          python-version: 3.8
+      - name: Setup Archery
+        run: pip install -e dev/archery[docker]
+      - name: Execute Docker Build
+        run: archery docker run debian-go-cgo
+      - name: Docker Push
+        if: success() && github.event_name == 'push' && github.repository == 'apache/arrow'
+        continue-on-error: true
+        run: archery docker push debian-go-cgo
+
+
   docker_cgo_python:
     name: AMD64 Debian 11 GO ${{ matrix.go }} - CGO Python
     runs-on: ubuntu-latest
@@ -163,3 +197,91 @@ jobs:
       - name: Test
         shell: bash
         run: ci/scripts/go_test.sh .
+
+  macos-cgo:
+    name: AMD64 MacOS 10.15 Go ${{ matrix.go }} - CGO
+    runs-on: macos-latest
+    if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
+    timeout-minutes: 60
+    strategy:
+      fail-fast: false
+      matrix:
+        go: [1.15]
+    env:
+      ARROW_GO_TESTCGO: "1"
+    steps:
+      - name: Install go
+        uses: actions/setup-go@v1
+        with:
+          go-version: ${{ matrix.go }}
+      - name: Checkout Arrow
+        uses: actions/checkout@v2
+        with:
+          fetch-depth: 0
+      - name: Fetch Submodules and Tags
+        shell: bash
+        run: ci/scripts/util_checkout.sh
+      - name: Brew Install Arrow
+        shell: bash
+        run: brew install apache-arrow
+      - name: Build
+        shell: bash
+        run: ci/scripts/go_build.sh .
+      - name: Test
+        shell: bash
+        run: ci/scripts/go_test.sh .
+
+  windows-mingw:
+    name: AMD64 Windows MinGW ${{ matrix.mingw-n-bits }} CGO
+    runs-on: windows-latest
+    if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
+    timeout-minutes: 60
+    strategy:
+      fail-fast: false
+      matrix:
+        go: [1.15]
+        mingw-n-bits:
+          #- 32 runtime handling for CGO needs 64-bit currently
+          - 64
+    env:
+      ARROW_GO_TESTCGO: "1"            
+    steps:
+      - name: Disable Crash Dialogs
+        run: |
+          reg add `
+            "HKCU\SOFTWARE\Microsoft\Windows\Windows Error Reporting" `
+            /v DontShowUI `
+            /t REG_DWORD `
+            /d 1 `
+            /f
+      - name: Install go
+        uses: actions/setup-go@v1
+        with:
+          go-version: ${{ matrix.go }}
+      - name: Checkout Arrow
+        uses: actions/checkout@v2
+        with:
+          fetch-depth: 0
+      - name: Fetch Submodules and Tags
+        shell: bash
+        run: ci/scripts/util_checkout.sh
+      - uses: msys2/setup-msys2@v2
+        with:
+          msystem: MINGW${{ matrix.mingw-n-bits }}
+          update: true
+      - name: Setup MSYS2
+        shell: msys2 {0}
+        run: |
+          ci/scripts/msys2_setup.sh cgo
+      - name: Update CGO Env vars
+        shell: msys2 {0}
+        run: |
+          echo "CGO_CPPFLAGS=-I$(cygpath --windows ${MINGW_PREFIX}/include)" >> $GITHUB_ENV
+          echo "CGO_LDFLAGS=-g -O2 -L$(cygpath --windows ${MINGW_PREFIX}/lib) -L$(cygpath --windows ${MINGW_PREFIX}/bin)" >> $GITHUB_ENV
+          echo "$(cygpath --windows ${MINGW_PREFIX}/bin)" >> $GITHUB_PATH
+      - name: Build
+        shell: bash
+        run: ci/scripts/go_build.sh .
+      - name: Test
+        shell: bash
+        run: ci/scripts/go_test.sh .
diff --git a/ci/scripts/go_build.sh b/ci/docker/debian-go-cgo.dockerfile
old mode 100755
new mode 100644
similarity index 54%
copy from ci/scripts/go_build.sh
copy to ci/docker/debian-go-cgo.dockerfile
index 7093be4..a494d1e
--- a/ci/scripts/go_build.sh
+++ b/ci/docker/debian-go-cgo.dockerfile
@@ -1,5 +1,3 @@
-#!/usr/bin/env bash
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,20 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-set -ex
-
-source_dir=${1}/go
-
-pushd ${source_dir}/arrow
-
-go get -d -t -v ./...
-go install -v ./...
-
-popd
-
-pushd ${source_dir}/parquet
-
-go get -d -t -v ./...
-go install -v ./...
-
-popd
+ARG base
+FROM ${base}
+
+ENV DEBIAN_FRONTEND noninteractive
+
+# install libarrow-dev to link against with CGO
+RUN apt-get update -y -q && \
+    apt-get install -y -q --no-install-recommends ca-certificates lsb-release wget && \
+    wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb && \
+    apt-get install -y -q --no-install-recommends ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb && \
+    apt-get update -y -q && \
+    apt-get install -y -q --no-install-recommends \
+        cmake \
+        libarrow-dev && \
+    apt-get clean
diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh
index 7093be4..267f78e 100755
--- a/ci/scripts/go_build.sh
+++ b/ci/scripts/go_build.sh
@@ -23,8 +23,12 @@ source_dir=${1}/go
 
 pushd ${source_dir}/arrow
 
+if [[ -n "${ARROW_GO_TESTCGO}" ]]; then
+    TAGS="-tags ccalloc"
+fi
+
 go get -d -t -v ./...
-go install -v ./...
+go install $TAGS -v ./...
 
 popd
 
diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh
index 18855ac..7bbfb2d 100755
--- a/ci/scripts/go_test.sh
+++ b/ci/scripts/go_test.sh
@@ -31,12 +31,18 @@ esac
 
 pushd ${source_dir}/arrow
 
+TAGS="test"
+if [[ -n "${ARROW_GO_TESTCGO}" ]]; then
+    TAGS="${TAGS},ccalloc"
+fi
+
+
 # the cgo implementation of the c data interface requires the "test"
 # tag in order to run its tests so that the testing functions implemented
 # in .c files don't get included in non-test builds.
 
 for d in $(go list ./... | grep -v vendor); do
-    go test $testargs -tags "test" $d
+    go test $testargs -tags $TAGS $d
 done
 
 popd
diff --git a/ci/scripts/msys2_setup.sh b/ci/scripts/msys2_setup.sh
index e126353..3addee8 100755
--- a/ci/scripts/msys2_setup.sh
+++ b/ci/scripts/msys2_setup.sh
@@ -60,6 +60,13 @@ case "${target}" in
     ;;
 esac
 
+case "${target}" in 
+  cgo)
+    packages+=(${MINGW_PACKAGE_PREFIX}-arrow)
+    packages+=(${MINGW_PACKAGE_PREFIX}-gcc)
+    ;;
+esac
+
 pacman \
   --needed \
   --noconfirm \
diff --git a/docker-compose.yml b/docker-compose.yml
index 482a9e0..5b4cb09 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -113,6 +113,7 @@ x-hierarchy:
       - debian-ruby
     - debian-python
   - debian-go:
+    - debian-go-cgo
     - debian-go-cgo-python
   - debian-java:
     - debian-java-jni
@@ -1200,6 +1201,24 @@ services:
         /arrow/ci/scripts/go_build.sh /arrow &&
         /arrow/ci/scripts/go_test.sh /arrow"
 
+  debian-go-cgo:
+    # Usage:
+    #   docker-compose build debian-go-cgo
+    #   docker-compose run debian-go-cgo
+    image: ${REPO}:${ARCH}-debian-${DEBIAN}-go-${GO}-cgo
+    build:
+      context: .
+      dockerfile: ci/docker/debian-go-cgo.dockerfile
+      cache_from:
+        - ${REPO}:${ARCH}-debian-${DEBIAN}-go-${GO}-cgo
+      args:
+        base: ${REPO}:${ARCH}-debian-${DEBIAN}-go-${GO}
+    shm_size: *shm-size
+    volumes: *debian-volumes
+    environment:
+      ARROW_GO_TESTCGO: "1"
+    command: *go-command
+
   debian-go-cgo-python:
     # Usage:
     #   docker-compose build debian-go-cgo-python
diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go
index 7cda454..1d4b5a4 100644
--- a/go/arrow/cdata/cdata.go
+++ b/go/arrow/cdata/cdata.go
@@ -22,9 +22,6 @@ package cdata
 
 // #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
-// typedef struct ArrowSchema ArrowSchema;
-// typedef struct ArrowArray ArrowArray;
-// typedef struct ArrowArrayStream ArrowArrayStream;
 //
 // int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); }
 // int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); }
@@ -52,12 +49,12 @@ import (
 
 type (
 	// CArrowSchema is the C Data Interface for ArrowSchemas defined in abi.h
-	CArrowSchema = C.ArrowSchema
+	CArrowSchema = C.struct_ArrowSchema
 	// CArrowArray is the C Data Interface object for Arrow Arrays as defined in abi.h
-	CArrowArray = C.ArrowArray
+	CArrowArray = C.struct_ArrowArray
 	// CArrowArrayStream is the Experimental API for handling streams of record batches
 	// through the C Data interface.
-	CArrowArrayStream = C.ArrowArrayStream
+	CArrowArrayStream = C.struct_ArrowArrayStream
 )
 
 // Map from the defined strings to their corresponding arrow.DataType interface
@@ -146,7 +143,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
 	var childFields []arrow.Field
 	if schema.n_children > 0 {
 		// call ourselves recursively if there are children.
-		var schemaChildren []*C.ArrowSchema
+		var schemaChildren []*CArrowSchema
 		// set up a slice to reference safely
 		s := (*reflect.SliceHeader)(unsafe.Pointer(&schemaChildren))
 		s.Data = uintptr(unsafe.Pointer(schema.children))
@@ -255,21 +252,21 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
 // importer to keep track when importing C ArrowArray objects.
 type cimporter struct {
 	dt       arrow.DataType
-	arr      *C.ArrowArray
+	arr      *CArrowArray
 	data     *array.Data
 	parent   *cimporter
 	children []cimporter
 	cbuffers []*C.void
 }
 
-func (imp *cimporter) importChild(parent *cimporter, src *C.ArrowArray) error {
+func (imp *cimporter) importChild(parent *cimporter, src *CArrowArray) error {
 	imp.parent = parent
 	return imp.doImport(src)
 }
 
 // import any child arrays for lists, structs, and so on.
 func (imp *cimporter) doImportChildren() error {
-	var children []*C.ArrowArray
+	var children []*CArrowArray
 	// create a proper slice for our children
 	s := (*reflect.SliceHeader)(unsafe.Pointer(&children))
 	s.Data = uintptr(unsafe.Pointer(imp.arr.children))
@@ -315,13 +312,13 @@ func (imp *cimporter) initarr() {
 
 // import is called recursively as needed for importing an array and its children
 // in order to generate array.Data objects
-func (imp *cimporter) doImport(src *C.ArrowArray) error {
+func (imp *cimporter) doImport(src *CArrowArray) error {
 	imp.initarr()
 	// move the array from the src object passed in to the one referenced by
 	// this importer. That way we can set up a finalizer on the created
 	// *array.Data object so we clean up our Array's memory when garbage collected.
 	C.ArrowArrayMove(src, imp.arr)
-	defer func(arr *C.ArrowArray) {
+	defer func(arr *CArrowArray) {
 		if imp.data != nil {
 			runtime.SetFinalizer(imp.data, func(*array.Data) {
 				C.ArrowArrayRelease(arr)
@@ -339,7 +336,9 @@ func (imp *cimporter) doImport(src *C.ArrowArray) error {
 
 	// get a view of the buffers, zero-copy. we're just looking at the pointers
 	const maxlen = 0x7fffffff
-	imp.cbuffers = (*[maxlen]*C.void)(unsafe.Pointer(imp.arr.buffers))[:imp.arr.n_buffers:imp.arr.n_buffers]
+	if imp.arr.n_buffers > 0 {
+		imp.cbuffers = (*[maxlen]*C.void)(unsafe.Pointer(imp.arr.buffers))[:imp.arr.n_buffers:imp.arr.n_buffers]
+	}
 
 	// handle each of our type cases
 	switch dt := imp.dt.(type) {
@@ -521,13 +520,13 @@ func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth int, of
 	return imp.importBuffer(bufferID, int64(bufsize))
 }
 
-func importCArrayAsType(arr *C.ArrowArray, dt arrow.DataType) (imp *cimporter, err error) {
+func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, err error) {
 	imp = &cimporter{dt: dt}
 	err = imp.doImport(arr)
 	return
 }
 
-func initReader(rdr *nativeCRecordBatchReader, stream *C.ArrowArrayStream) {
+func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) {
 	st := C.get_stream()
 	rdr.stream = &st
 	C.ArrowArrayStreamMove(stream, rdr.stream)
@@ -536,7 +535,7 @@ func initReader(rdr *nativeCRecordBatchReader, stream *C.ArrowArrayStream) {
 
 // Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface
 type nativeCRecordBatchReader struct {
-	stream *C.ArrowArrayStream
+	stream *CArrowArrayStream
 	schema *arrow.Schema
 }
 
@@ -546,7 +545,7 @@ func (n *nativeCRecordBatchReader) getError(errno int) error {
 
 func (n *nativeCRecordBatchReader) Read() (array.Record, error) {
 	if n.schema == nil {
-		var sc C.ArrowSchema
+		var sc CArrowSchema
 		errno := C.stream_get_schema(n.stream, &sc)
 		if errno != 0 {
 			return nil, n.getError(int(errno))
diff --git a/go/arrow/memory/cgo_allocator.go b/go/arrow/memory/cgo_allocator.go
new file mode 100644
index 0000000..094ab57
--- /dev/null
+++ b/go/arrow/memory/cgo_allocator.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cgo
+// +build ccalloc
+
+package memory
+
+import (
+	"runtime"
+
+	cga "github.com/apache/arrow/go/arrow/memory/internal/cgoalloc"
+)
+
+// CgoArrowAllocator is an allocator which exposes the C++ memory pool class
+// from the Arrow C++ Library as an allocator for memory buffers to use in Go.
+// The build tag 'ccalloc' must be used in order to include it as it requires
+// linking against the arrow library.
+//
+// The primary reason to use this would be as an allocator when dealing with
+// exporting data across the cdata interface in order to ensure that the memory
+// is allocated safely on the C side so it can be held on the CGO side beyond
+// the context of a single function call. If the memory in use isn't allocated
+// on the C side, then it is not safe for any pointers to data to be held outside
+// of Go beyond the context of a single Cgo function call as it will be invisible
+// to the Go garbage collector and could potentially get moved without being updated.
+//
+// As an alternative, if the arrow C++ libraries aren't available, remember that
+// Allocator is an interface, so anything which can allocate data using C/C++ can
+// be exposed and then used to meet the Allocator interface if wanting to export data
+// across the Cgo interfaces.
+type CgoArrowAllocator struct {
+	pool cga.CGOMemPool
+}
+
+// Allocate does what it says on the tin, allocates a chunk of memory using the underlying
+// memory pool, however CGO calls are 'relatively' expensive, which means doing tons of
+// small allocations can end up being expensive and potentially slower than just using
+// go memory. This means that preallocating via reserve becomes much more important when
+// using this allocator.
+//
+// Future development TODO: look into converting this more into a slab style allocator
+// which amortizes the cost of smaller allocations by allocating bigger chunks of memory
+// and passes them out.
+func (alloc *CgoArrowAllocator) Allocate(size int) []byte {
+	b := cga.CgoPoolAlloc(alloc.pool, size)
+	return b
+}
+
+func (alloc *CgoArrowAllocator) Free(b []byte) {
+	cga.CgoPoolFree(alloc.pool, b)
+}
+
+func (alloc *CgoArrowAllocator) Reallocate(size int, b []byte) []byte {
+	oldSize := len(b)
+	out := cga.CgoPoolRealloc(alloc.pool, size, b)
+
+	if size > oldSize {
+		// zero initialize the slice like go would do normally
+		// C won't zero initialize the memory.
+		Set(out[oldSize:], 0)
+	}
+	return out
+}
+
+// AllocatedBytes returns the current total of bytes that have been allocated by
+// the memory pool on the C++ side.
+func (alloc *CgoArrowAllocator) AllocatedBytes() int64 {
+	return cga.CgoPoolCurBytes(alloc.pool)
+}
+
+// AssertSize can be used for testing to ensure and check that there are no memory
+// leaks using the allocator.
+func (alloc *CgoArrowAllocator) AssertSize(t TestingT, sz int) {
+	cur := alloc.AllocatedBytes()
+	if int64(sz) != cur {
+		t.Helper()
+		t.Errorf("invalid memory size exp=%d, got=%d", sz, cur)
+	}
+}
+
+// NewCgoArrowAllocator creates a new allocator which is backed by the C++ Arrow
+// memory pool object which could potentially be using jemalloc or mimalloc or
+// otherwise as its backend. Memory allocated by this is invisible to the Go
+// garbage collector, and as such care should be taken to avoid any memory leaks.
+//
+// A finalizer is set on the allocator so when the allocator object itself is eventually
+// cleaned up by the garbage collector, it will delete the associated C++ memory pool
+// object. If the build tag 'cclog' is added, then the memory pool will output a log line
+// for every time memory is allocated, freed or reallocated.
+func NewCgoArrowAllocator() *CgoArrowAllocator {
+	alloc := &CgoArrowAllocator{pool: cga.NewCgoArrowAllocator(enableLogging)}
+	runtime.SetFinalizer(alloc, func(a *CgoArrowAllocator) { cga.ReleaseCGOMemPool(a.pool) })
+	return alloc
+}
diff --git a/go/arrow/memory/cgo_allocator_defaults.go b/go/arrow/memory/cgo_allocator_defaults.go
new file mode 100644
index 0000000..501431a
--- /dev/null
+++ b/go/arrow/memory/cgo_allocator_defaults.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cgo
+// +build ccalloc
+// +build !cclog
+
+package memory
+
+const enableLogging = false
diff --git a/go/arrow/memory/cgo_allocator_logging.go b/go/arrow/memory/cgo_allocator_logging.go
new file mode 100644
index 0000000..01ad6b3
--- /dev/null
+++ b/go/arrow/memory/cgo_allocator_logging.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cgo
+// +build ccalloc
+// +build cclog
+
+package memory
+
+const enableLogging = true
diff --git a/go/arrow/memory/cgo_allocator_test.go b/go/arrow/memory/cgo_allocator_test.go
new file mode 100644
index 0000000..e7a0376
--- /dev/null
+++ b/go/arrow/memory/cgo_allocator_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cgo
+// +build ccalloc
+
+package memory
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestCgoArrowAllocator_Allocate(t *testing.T) {
+	tests := []struct {
+		name string
+		sz   int
+	}{
+		{"lt alignment", 33},
+		{"gt alignment unaligned", 65},
+		{"eq alignment", 64},
+		{"large unaligned", 4097},
+		{"large aligned", 8192},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			alloc := NewCgoArrowAllocator()
+			buf := alloc.Allocate(test.sz)
+			assert.NotNil(t, buf)
+			assert.Len(t, buf, test.sz)
+
+			alloc.AssertSize(t, test.sz)
+			defer alloc.AssertSize(t, 0)
+			defer alloc.Free(buf)
+		})
+	}
+}
+
+func TestCgoArrowAllocator_Reallocate(t *testing.T) {
+	tests := []struct {
+		name     string
+		sz1, sz2 int
+	}{
+		{"smaller", 200, 100},
+		{"same", 200, 200},
+		{"larger", 200, 300},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			alloc := NewCgoArrowAllocator()
+			buf := alloc.Allocate(test.sz1)
+			for i := range buf {
+				buf[i] = byte(i & 0xFF)
+			}
+
+			exp := make([]byte, test.sz2)
+			copy(exp, buf)
+
+			newBuf := alloc.Reallocate(test.sz2, buf)
+			assert.Equal(t, exp, newBuf)
+
+			alloc.AssertSize(t, test.sz2)
+			defer alloc.AssertSize(t, 0)
+			defer alloc.Free(newBuf)
+		})
+	}
+}
diff --git a/go/arrow/memory/checked_allocator.go b/go/arrow/memory/checked_allocator.go
index 24dc8df..cc672af 100644
--- a/go/arrow/memory/checked_allocator.go
+++ b/go/arrow/memory/checked_allocator.go
@@ -16,10 +16,19 @@
 
 package memory
 
+import (
+	"os"
+	"runtime"
+	"strconv"
+	"sync"
+	"unsafe"
+)
+
 type CheckedAllocator struct {
-	mem  Allocator
-	base int
-	sz   int
+	mem Allocator
+	sz  int
+
+	allocs sync.Map
 }
 
 func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
@@ -28,17 +37,79 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
 
 func (a *CheckedAllocator) Allocate(size int) []byte {
 	a.sz += size
-	return a.mem.Allocate(size)
+	out := a.mem.Allocate(size)
+	if size == 0 {
+		return out
+	}
+
+	ptr := uintptr(unsafe.Pointer(&out[0]))
+	if pc, _, l, ok := runtime.Caller(allocFrames); ok {
+		a.allocs.Store(ptr, &dalloc{pc: pc, line: l, sz: size})
+	}
+	return out
 }
 
 func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
 	a.sz += size - len(b)
-	return a.mem.Reallocate(size, b)
+
+	oldptr := uintptr(unsafe.Pointer(&b[0]))
+	out := a.mem.Reallocate(size, b)
+	if size == 0 {
+		return out
+	}
+
+	newptr := uintptr(unsafe.Pointer(&out[0]))
+	a.allocs.Delete(oldptr)
+	if pc, _, l, ok := runtime.Caller(reallocFrames); ok {
+		a.allocs.Store(newptr, &dalloc{pc: pc, line: l, sz: size})
+	}
+	return out
 }
 
 func (a *CheckedAllocator) Free(b []byte) {
 	a.sz -= len(b)
-	a.mem.Free(b)
+	defer a.mem.Free(b)
+
+	if len(b) == 0 {
+		return
+	}
+
+	ptr := uintptr(unsafe.Pointer(&b[0]))
+	a.allocs.Delete(ptr)
+}
+
+// typically the allocations are happening in memory.Buffer, not by consumers calling
+// allocate/reallocate directly. As a result, we want to skip the caller frames
+// of the inner workings of Buffer in order to find the caller that actually triggered
+// the allocation via a call to Resize/Reserve/etc.
+const (
+	defAllocFrames   = 4
+	defReallocFrames = 3
+)
+
+// Use the environment variables ARROW_CHECKED_ALLOC_FRAMES and ARROW_CHECKED_REALLOC_FRAMES
+// to control how many frames up it checks when storing the caller for allocations/reallocs
+// when using this to find memory leaks.
+var allocFrames, reallocFrames int = defAllocFrames, defReallocFrames
+
+func init() {
+	if val, ok := os.LookupEnv("ARROW_CHECKED_ALLOC_FRAMES"); ok {
+		if f, err := strconv.Atoi(val); err == nil {
+			allocFrames = f
+		}
+	}
+
+	if val, ok := os.LookupEnv("ARROW_CHECKED_REALLOC_FRAMES"); ok {
+		if f, err := strconv.Atoi(val); err == nil {
+			reallocFrames = f
+		}
+	}
+}
+
+type dalloc struct {
+	pc   uintptr
+	line int
+	sz   int
 }
 
 type TestingT interface {
@@ -47,6 +118,13 @@ type TestingT interface {
 }
 
 func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
+	a.allocs.Range(func(_, value interface{}) bool {
+		info := value.(*dalloc)
+		f := runtime.FuncForPC(info.pc)
+		t.Errorf("LEAK of %d bytes FROM %s line %d\n", info.sz, f.Name(), info.line)
+		return true
+	})
+
 	if a.sz != sz {
 		t.Helper()
 		t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz)
diff --git a/go/arrow/memory/internal/cgoalloc/allocator.cc b/go/arrow/memory/internal/cgoalloc/allocator.cc
new file mode 100644
index 0000000..b2b0373
--- /dev/null
+++ b/go/arrow/memory/internal/cgoalloc/allocator.cc
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// +build ccalloc
+
+#include "allocator.h"
+#include "arrow/memory_pool.h"
+#include "helpers.h"
+
+struct mem_holder {
+    std::unique_ptr<arrow::MemoryPool> owned_pool;    
+    arrow::MemoryPool* pool;
+};
+
+ArrowMemoryPool arrow_create_memory_pool(bool enable_logging) {
+    auto holder = std::make_shared<mem_holder>();    
+    if (enable_logging) {        
+        holder->owned_pool.reset(new arrow::LoggingMemoryPool(arrow::default_memory_pool()));
+        holder->pool = holder->owned_pool.get();
+    } else {
+        holder->pool = arrow::default_memory_pool();
+    }
+
+    return create_ref(holder);
+}
+
+void arrow_release_pool(ArrowMemoryPool pool) {
+    release_ref<mem_holder>(pool);
+}
+
+int arrow_pool_allocate(ArrowMemoryPool pool, int64_t size, uint8_t** out) {
+    auto holder = retrieve_instance<mem_holder>(pool);
+    auto status = holder->pool->Allocate(size, out);
+    if (!status.ok()) {
+        return 1;
+    }
+    return 0;
+}
+
+void arrow_pool_free(ArrowMemoryPool pool, uint8_t* buffer, int64_t size) {
+    auto holder = retrieve_instance<mem_holder>(pool);
+    holder->pool->Free(buffer, size);
+}
+
+int arrow_pool_reallocate(ArrowMemoryPool pool, int64_t old_size, int64_t new_size, uint8_t** ptr) {
+    auto holder = retrieve_instance<mem_holder>(pool);
+    auto status = holder->pool->Reallocate(old_size, new_size, ptr);
+    if (!status.ok()) {
+        return 1;
+    }
+    return 0;
+}
+
+int64_t arrow_pool_bytes_allocated(ArrowMemoryPool pool) {
+    auto holder = retrieve_instance<mem_holder>(pool);
+    return holder->pool->bytes_allocated();
+}
diff --git a/go/arrow/memory/internal/cgoalloc/allocator.go b/go/arrow/memory/internal/cgoalloc/allocator.go
new file mode 100644
index 0000000..213e759
--- /dev/null
+++ b/go/arrow/memory/internal/cgoalloc/allocator.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build ccalloc
+
+package cgoalloc
+
+// #cgo !windows pkg-config: arrow
+// #cgo CXXFLAGS: -std=c++14
+// #cgo windows LDFLAGS:  -larrow
+// #include "allocator.h"
+import "C"
+import (
+	"reflect"
+	"unsafe"
+)
+
+// CGOMemPool is an alias to the typedef'd uintptr from the allocator.h file
+type CGOMemPool = C.ArrowMemoryPool
+
+// CgoPoolAlloc allocates a block of memory of length 'size' using the memory
+// pool that is passed in.
+func CgoPoolAlloc(pool CGOMemPool, size int) []byte {
+	var ret []byte
+	if size == 0 {
+		return ret
+	}
+
+	var out *C.uint8_t
+	C.arrow_pool_allocate(pool, C.int64_t(size), (**C.uint8_t)(unsafe.Pointer(&out)))
+
+	s := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
+	s.Data = uintptr(unsafe.Pointer(out))
+	s.Len = size
+	s.Cap = size
+
+	return ret
+}
+
+// CgoPoolRealloc calls 'reallocate' on the block of memory passed in which must
+// be a slice that was returned by CgoPoolAlloc or CgoPoolRealloc.
+func CgoPoolRealloc(pool CGOMemPool, size int, b []byte) []byte {
+	if len(b) == 0 {
+		return CgoPoolAlloc(pool, size)
+	}
+
+	oldSize := C.int64_t(len(b))
+	data := (*C.uint8_t)(unsafe.Pointer(&b[0]))
+	C.arrow_pool_reallocate(pool, oldSize, C.int64_t(size), &data)
+
+	var ret []byte
+	s := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
+	s.Data = uintptr(unsafe.Pointer(data))
+	s.Len = size
+	s.Cap = size
+
+	return ret
+}
+
+// CgoPoolFree uses the indicated memory pool to free a block of memory. The
+// slice passed in *must* be a slice which was returned by CgoPoolAlloc or
+// CgoPoolRealloc.
+func CgoPoolFree(pool CGOMemPool, b []byte) {
+	if len(b) == 0 {
+		return
+	}
+
+	oldSize := C.int64_t(len(b))
+	data := (*C.uint8_t)(unsafe.Pointer(&b[0]))
+	C.arrow_pool_free(pool, data, oldSize)
+}
+
+// CgoPoolCurBytes returns the current number of bytes allocated by the
+// passed in memory pool.
+func CgoPoolCurBytes(pool CGOMemPool) int64 {
+	return int64(C.arrow_pool_bytes_allocated(pool))
+}
+
+// ReleaseCGOMemPool deletes and frees the memory associated with the
+// passed in memory pool on the C++ side.
+func ReleaseCGOMemPool(pool CGOMemPool) {
+	C.arrow_release_pool(pool)
+}
+
+// NewCgoArrowAllocator constructs a new memory pool in C++ and returns
+// a reference to it which can then be used with the other functions
+// here in order to use it.
+//
+// Optionally if logging is true, a logging proxy will be wrapped around
+// the memory pool so that it will output a line every time memory is
+// allocated, reallocated or freed along with the size of the allocation.
+func NewCgoArrowAllocator(logging bool) CGOMemPool {
+	return C.arrow_create_memory_pool(C.bool(logging))
+}
diff --git a/go/arrow/memory/internal/cgoalloc/allocator.h b/go/arrow/memory/internal/cgoalloc/allocator.h
new file mode 100644
index 0000000..0c87443
--- /dev/null
+++ b/go/arrow/memory/internal/cgoalloc/allocator.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef uintptr_t ArrowMemoryPool;
+
+ArrowMemoryPool arrow_create_memory_pool(bool enable_logging);
+int arrow_pool_allocate(ArrowMemoryPool pool, int64_t size, uint8_t** out);
+int arrow_pool_reallocate(ArrowMemoryPool pool, int64_t old_size, int64_t new_size, uint8_t** ptr);
+void arrow_pool_free(ArrowMemoryPool pool, uint8_t* buffer, int64_t size);
+int64_t arrow_pool_bytes_allocated(ArrowMemoryPool pool);
+void arrow_release_pool(ArrowMemoryPool pool);
+
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/go/arrow/memory/internal/cgoalloc/helpers.h b/go/arrow/memory/internal/cgoalloc/helpers.h
new file mode 100644
index 0000000..fa5feb6
--- /dev/null
+++ b/go/arrow/memory/internal/cgoalloc/helpers.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+// helper functions to be included by C++ code for interacting with Cgo
+
+// create_ref will construct a shared_ptr on the heap and return a pointer
+// to it. the returned uintptr_t can then be used with retrieve_instance
+// to get back the shared_ptr and object it refers to. This ensures that
+// the object outlives the exported function so that Go can use it.
+template <typename T>
+uintptr_t create_ref(std::shared_ptr<T> t) {
+    std::shared_ptr<T>* retained_ptr = new std::shared_ptr<T>(t);
+    return reinterpret_cast<uintptr_t>(retained_ptr);
+}
+
+// retrieve_instance is used to get back the shared_ptr which was created with
+// create_ref in order to use it in functions where the caller passes back the
+// uintptr_t so that an object can be managed by C++ while a reference to it
+// is passed around in C/CGO
+template <typename T>
+std::shared_ptr<T> retrieve_instance(uintptr_t ref) {
+    std::shared_ptr<T>* retrieved_ptr = reinterpret_cast<std::shared_ptr<T>*>(ref);
+    return *retrieved_ptr;
+}
+
+// release_ref deletes the shared_ptr that was created by create_ref, freeing the
+// object if it was the last shared_ptr which referenced it as per normal smart_ptr
+// rules.
+template <typename T>
+void release_ref(uintptr_t ref) {
+    std::shared_ptr<T>* retrieved_ptr = reinterpret_cast<std::shared_ptr<T>*>(ref);
+    delete retrieved_ptr;
+}