You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/11/23 16:27:23 UTC

[arrow] branch master updated: ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a47e8dced ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)
7a47e8dced is described below

commit 7a47e8dcedd2f3198df90d68650a5e9b5597bb58
Author: David Li <li...@gmail.com>
AuthorDate: Wed Nov 23 11:27:16 2022 -0500

    ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)
    
    - Document some of the memory management internals
    - Note that in practice, you have to use the concrete implementations of certain interfaces
    - Begin integrating C Data into the full memory management stack instead of trying to patch it in at the edges
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 java/.gitignore                                    |   2 -
 .../java/org/apache/arrow/c/ArrayImporter.java     |  57 ++--
 .../main/java/org/apache/arrow/c/ArrowArray.java   |   6 +
 .../apache/arrow/c/BufferImportTypeVisitor.java    | 322 +++++++++++++++++++++
 .../org/apache/arrow/c/CDataReferenceManager.java  | 124 --------
 .../apache/arrow/c/ReferenceCountedArrowArray.java |  74 +++++
 .../org/apache/arrow/c/ArrowArrayUtilityTest.java  | 147 ++++++++++
 .../java/org/apache/arrow/c/RoundtripTest.java     |  14 +-
 .../org/apache/arrow/memory/AllocationManager.java |  55 ++--
 .../org/apache/arrow/memory/BufferAllocator.java   |  31 ++
 .../org/apache/arrow/memory/ForeignAllocation.java |  58 ++++
 .../arrow/memory/ForeignAllocationManager.java     |  45 +++
 .../org/apache/arrow/memory/ReferenceManager.java  |  15 +-
 .../apache/arrow/memory/TestForeignAllocation.java |  72 +++++
 14 files changed, 820 insertions(+), 202 deletions(-)

diff --git a/java/.gitignore b/java/.gitignore
index 59c2e7b2a0..07e84864a3 100644
--- a/java/.gitignore
+++ b/java/.gitignore
@@ -21,8 +21,6 @@ arrow-git.properties
 cmake_install.cmake
 install_manifest.txt
 target/
-?/
-!/c/
 
 # Generated properties file
 flight/flight-sql-jdbc-driver/src/main/resources/properties/flight.properties
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
index 4da3806575..7132887dde 100644
--- a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
@@ -22,13 +22,13 @@ import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
 import static org.apache.arrow.util.Preconditions.checkNotNull;
 import static org.apache.arrow.util.Preconditions.checkState;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TypeLayout;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -44,12 +44,12 @@ final class ArrayImporter {
   private final FieldVector vector;
   private final DictionaryProvider dictionaryProvider;
 
-  private CDataReferenceManager referenceManager;
+  private ReferenceCountedArrowArray underlyingAllocation;
   private int recursionLevel;
 
   ArrayImporter(BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
-    this.allocator = allocator;
-    this.vector = vector;
+    this.allocator = Preconditions.checkNotNull(allocator);
+    this.vector = Preconditions.checkNotNull(vector);
     this.dictionaryProvider = dictionaryProvider;
   }
 
@@ -66,12 +66,11 @@ final class ArrayImporter {
     recursionLevel = 0;
 
     // This keeps the array alive as long as there are any buffers that need it
-    referenceManager = new CDataReferenceManager(ownedArray);
+    underlyingAllocation = new ReferenceCountedArrowArray(ownedArray);
     try {
-      referenceManager.increment();
       doImport(snapshot);
     } finally {
-      referenceManager.release();
+      underlyingAllocation.release();
     }
   }
 
@@ -81,9 +80,7 @@ final class ArrayImporter {
     recursionLevel = parent.recursionLevel + 1;
     checkState(recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, "Recursion level in ArrowArray struct exceeded");
     // Child buffers will keep the entire parent import alive.
-    // Perhaps we can move the child structs on import,
-    // but that is another level of complication.
-    referenceManager = parent.referenceManager;
+    underlyingAllocation = parent.underlyingAllocation;
     doImport(snapshot);
   }
 
@@ -118,36 +115,20 @@ final class ArrayImporter {
 
     // Import main data
     ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
-    List<ArrowBuf> buffers = importBuffers(snapshot);
-    try {
+    long[] bufferPointers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
+
+    try (final BufferImportTypeVisitor visitor = new BufferImportTypeVisitor(
+        allocator, underlyingAllocation, fieldNode, bufferPointers)) {
+      final List<ArrowBuf> buffers;
+      if (bufferPointers == null || bufferPointers.length == 0) {
+        buffers = Collections.emptyList();
+      } else {
+        buffers = vector.getField().getType().accept(visitor);
+      }
       vector.loadFieldBuffers(fieldNode, buffers);
-    } catch (RuntimeException e) {
+    } catch (Exception e) {
       throw new IllegalArgumentException(
           "Could not load buffers for field " + vector.getField() + ". error message: " + e.getMessage(), e);
     }
   }
-
-  private List<ArrowBuf> importBuffers(ArrowArray.Snapshot snapshot) {
-    long[] buffers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
-    if (buffers == null || buffers.length == 0) {
-      return new ArrayList<>();
-    }
-
-    int buffersCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
-    checkState(buffers.length == buffersCount, "Expected %s buffers for imported type %s, ArrowArray struct has %s",
-        buffersCount, vector.getField().getType().getTypeID(), buffers.length);
-
-    List<ArrowBuf> result = new ArrayList<>(buffersCount);
-    for (long bufferPtr : buffers) {
-      ArrowBuf buffer = null;
-      if (bufferPtr != NULL) {
-        // See ARROW-17720: [Java] C data interface: Add API to compute imported buffer size
-        int capacity = Integer.MAX_VALUE;
-        buffer = new ArrowBuf(referenceManager, null, capacity, bufferPtr);
-        buffer.writerIndex(capacity);
-      }
-      result.add(buffer);
-    }
-    return result;
-  }
 }
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
index 99fe0432c1..a538852f47 100644
--- a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
@@ -28,6 +28,7 @@ import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.ReferenceManager;
 import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.VisibleForTesting;
 
 /**
  * C Data Interface ArrowArray.
@@ -149,6 +150,11 @@ public class ArrowArray implements BaseStruct {
     }
   }
 
+  @VisibleForTesting
+  boolean isClosed() {
+    return data == null;
+  }
+
   private ByteBuffer directBuffer() {
     return MemoryUtil.directBuffer(memoryAddress(), ArrowArray.SIZE_OF).order(ByteOrder.nativeOrder());
   }
diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
new file mode 100644
index 0000000000..c8b6d07086
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.apache.arrow.c.NativeUtil.NULL;
+import static org.apache.arrow.util.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DurationVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntervalDayVector;
+import org.apache.arrow.vector.IntervalMonthDayNanoVector;
+import org.apache.arrow.vector.IntervalYearVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.DenseUnionVector;
+import org.apache.arrow.vector.complex.LargeListVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.DataSizeRoundingUtil;
+
+/**
+ * Import buffers from a C Data Interface struct.
+ */
+class BufferImportTypeVisitor implements ArrowType.ArrowTypeVisitor<List<ArrowBuf>>, AutoCloseable {
+  private final BufferAllocator allocator;
+  private final ReferenceCountedArrowArray underlyingAllocation;
+  private final ArrowFieldNode fieldNode;
+  private final long[] buffers;
+  private final List<ArrowBuf> imported;
+
+  BufferImportTypeVisitor(BufferAllocator allocator, ReferenceCountedArrowArray underlyingAllocation,
+                          ArrowFieldNode fieldNode, long[] buffers) {
+    this.allocator = allocator;
+    this.underlyingAllocation = underlyingAllocation;
+    this.fieldNode = fieldNode;
+    this.buffers = buffers;
+    this.imported = new ArrayList<>();
+  }
+
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(imported);
+  }
+
+  @VisibleForTesting
+  long getBufferPtr(ArrowType type, int index) {
+    checkState(
+        buffers.length > index,
+        "Expected at least %s buffers for type %s, but found %s", index + 1, type, buffers.length);
+    if (buffers[index] == NULL) {
+      throw new IllegalStateException(String.format("Buffer %s for type %s cannot be null", index, type));
+    }
+    return buffers[index];
+  }
+
+  private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) {
+    final long bufferPtr = getBufferPtr(type, index);
+    final long capacity = DataSizeRoundingUtil.divideBy8Ceil(bitsPerSlot * fieldNode.getLength());
+    ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+    this.imported.add(buf);
+    return buf;
+  }
+
+  private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) {
+    final long bufferPtr = getBufferPtr(type, index);
+    final long capacity = bytesPerSlot * fieldNode.getLength();
+    ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+    this.imported.add(buf);
+    return buf;
+  }
+
+  private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) {
+    final long bufferPtr = getBufferPtr(type, 1);
+    final long capacity = bytesPerSlot * (fieldNode.getLength() + 1);
+    ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+    this.imported.add(buf);
+    return buf;
+  }
+
+  private ArrowBuf importData(ArrowType type, long capacity) {
+    final long bufferPtr = getBufferPtr(type, 2);
+    ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+    this.imported.add(buf);
+    return buf;
+  }
+
+  private ArrowBuf maybeImportBitmap(ArrowType type) {
+    checkState(
+        buffers.length > 0,
+        "Expected at least %s buffers for type %s, but found %s", 1, type, buffers.length);
+    if (buffers[0] == NULL) {
+      return null;
+    }
+    return importFixedBits(type, 0, /*bitsPerSlot=*/1);
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Null type) {
+    checkState(
+        buffers.length == 0,
+        "Expected %s buffers for type %s, but found %s", 0, type, buffers.length);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Struct type) {
+    return Collections.singletonList(maybeImportBitmap(type));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.List type) {
+    return Arrays.asList(maybeImportBitmap(type), importOffsets(type, ListVector.OFFSET_WIDTH));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.LargeList type) {
+    return Arrays.asList(maybeImportBitmap(type), importOffsets(type, LargeListVector.OFFSET_WIDTH));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.FixedSizeList type) {
+    return Collections.singletonList(maybeImportBitmap(type));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Union type) {
+    switch (type.getMode()) {
+      case Sparse:
+        return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH));
+      case Dense:
+        return Arrays.asList(importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH),
+            importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH));
+      default:
+        throw new UnsupportedOperationException("Importing buffers for type: " + type);
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Map type) {
+    return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Int type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth()));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.FloatingPoint type) {
+    switch (type.getPrecision()) {
+      case HALF:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, /*bytesPerSlot=*/2));
+      case SINGLE:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float4Vector.TYPE_WIDTH));
+      case DOUBLE:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float8Vector.TYPE_WIDTH));
+      default:
+        throw new UnsupportedOperationException("Importing buffers for type: " + type);
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Utf8 type) {
+    try (ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH)) {
+      final int start = offsets.getInt(0);
+      final int end = offsets.getInt(fieldNode.getLength() * (long) VarCharVector.OFFSET_WIDTH);
+      checkState(
+          end >= start,
+          "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+      final int len = end - start;
+      offsets.getReferenceManager().retain();
+      return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.LargeUtf8 type) {
+    try (ArrowBuf offsets = importOffsets(type, LargeVarCharVector.OFFSET_WIDTH)) {
+      final long start = offsets.getLong(0);
+      final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarCharVector.OFFSET_WIDTH);
+      checkState(
+          end >= start,
+          "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+      final long len = end - start;
+      offsets.getReferenceManager().retain();
+      return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Binary type) {
+    try (ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH)) {
+      final int start = offsets.getInt(0);
+      final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH);
+      checkState(
+          end >= start,
+          "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+      final int len = end - start;
+      offsets.getReferenceManager().retain();
+      return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.LargeBinary type) {
+    try (ArrowBuf offsets = importOffsets(type, LargeVarBinaryVector.OFFSET_WIDTH)) {
+      final long start = offsets.getLong(0);
+      // TODO: need better tests to cover the failure when I forget to multiply by offset width
+      final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarBinaryVector.OFFSET_WIDTH);
+      checkState(
+          end >= start,
+          "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+      final long len = end - start;
+      offsets.getReferenceManager().retain();
+      return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.FixedSizeBinary type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, type.getByteWidth()));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Bool type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, /*bitsPerSlot=*/1));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Decimal type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth()));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Date type) {
+    switch (type.getUnit()) {
+      case DAY:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateDayVector.TYPE_WIDTH));
+      case MILLISECOND:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateMilliVector.TYPE_WIDTH));
+      default:
+        throw new UnsupportedOperationException("Importing buffers for type: " + type);
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Time type) {
+    switch (type.getUnit()) {
+      case SECOND:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeSecVector.TYPE_WIDTH));
+      case MILLISECOND:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMilliVector.TYPE_WIDTH));
+      case MICROSECOND:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMicroVector.TYPE_WIDTH));
+      case NANOSECOND:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeNanoVector.TYPE_WIDTH));
+      default:
+        throw new UnsupportedOperationException("Importing buffers for type: " + type);
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Timestamp type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeStampVector.TYPE_WIDTH));
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Interval type) {
+    switch (type.getUnit()) {
+      case YEAR_MONTH:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalYearVector.TYPE_WIDTH));
+      case DAY_TIME:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalDayVector.TYPE_WIDTH));
+      case MONTH_DAY_NANO:
+        return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalMonthDayNanoVector.TYPE_WIDTH));
+      default:
+        throw new UnsupportedOperationException("Importing buffers for type: " + type);
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> visit(ArrowType.Duration type) {
+    return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DurationVector.TYPE_WIDTH));
+  }
+}
diff --git a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java b/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java
deleted file mode 100644
index c5c2f97790..0000000000
--- a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.c;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OwnershipTransferResult;
-import org.apache.arrow.memory.ReferenceManager;
-import org.apache.arrow.util.Preconditions;
-
-/**
- * A ReferenceManager implementation that holds a
- * {@link org.apache.arrow.c.BaseStruct}.
- * <p>
- * A reference count is maintained and once it reaches zero the struct is
- * released (as per the C data interface specification) and closed.
- */
-final class CDataReferenceManager implements ReferenceManager {
-  private final AtomicInteger bufRefCnt = new AtomicInteger(0);
-
-  private final BaseStruct struct;
-
-  CDataReferenceManager(BaseStruct struct) {
-    this.struct = struct;
-  }
-
-  @Override
-  public int getRefCount() {
-    return bufRefCnt.get();
-  }
-
-  @Override
-  public boolean release() {
-    return release(1);
-  }
-
-  /**
-   * Increment the reference count without any safety checks.
-   */
-  void increment() {
-    bufRefCnt.incrementAndGet();
-  }
-
-  @Override
-  public boolean release(int decrement) {
-    Preconditions.checkState(decrement >= 1, "ref count decrement should be greater than or equal to 1");
-    // decrement the ref count
-    final int refCnt = bufRefCnt.addAndGet(-decrement);
-    // the new ref count should be >= 0
-    Preconditions.checkState(refCnt >= 0, "ref count has gone negative");
-    if (refCnt == 0) {
-      // refcount of this reference manager has dropped to 0
-      // release the underlying memory
-      struct.release();
-      struct.close();
-    }
-    return refCnt == 0;
-  }
-
-  @Override
-  public void retain() {
-    retain(1);
-  }
-
-  @Override
-  public void retain(int increment) {
-    Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
-    final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
-    Preconditions.checkState(originalReferenceCount > 0, "retain called but memory was already released");
-  }
-
-  @Override
-  public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
-    retain();
-
-    ArrowBuf targetArrowBuf = this.deriveBuffer(srcBuffer, 0, srcBuffer.capacity());
-    targetArrowBuf.readerIndex(srcBuffer.readerIndex());
-    targetArrowBuf.writerIndex(srcBuffer.writerIndex());
-    return targetArrowBuf;
-  }
-
-  @Override
-  public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) {
-    final long derivedBufferAddress = sourceBuffer.memoryAddress() + index;
-    return new ArrowBuf(this, null, length, derivedBufferAddress);
-  }
-
-  @Override
-  public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public BufferAllocator getAllocator() {
-    return null;
-  }
-
-  @Override
-  public long getSize() {
-    return 0L;
-  }
-
-  @Override
-  public long getAccountedSize() {
-    return 0L;
-  }
-}
diff --git a/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java
new file mode 100644
index 0000000000..f09f14817b
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.ForeignAllocation;
+
+/**
+ * The owner of an imported C Data Interface array.
+ *
+ * <p>There is a fundamental mismatch here between memory allocation schemes: AllocationManager represents a single
+ * allocation (= a single address and length). But an ArrowArray combines multiple allocations behind a single
+ * deallocation callback. This class bridges the two by tracking a reference count, so that the single callback
+ * can be managed by multiple {@link ForeignAllocation} instances.
+ */
+final class ReferenceCountedArrowArray {
+  private final ArrowArray array;
+  private final AtomicInteger refCnt;
+
+  ReferenceCountedArrowArray(ArrowArray array) {
+    this.array = array;
+    this.refCnt = new AtomicInteger(1);
+  }
+
+  void retain() {
+    if (refCnt.addAndGet(1) - 1 <= 0) {
+      throw new IllegalStateException("Tried to retain a released ArrowArray");
+    }
+  }
+
+  void release() {
+    int refcnt = refCnt.addAndGet(-1);
+    if (refcnt == 0) {
+      array.release();
+      array.close();
+    } else if (refcnt < 0) {
+      throw new IllegalStateException("Reference count went negative for imported ArrowArray");
+    }
+  }
+
+  /**
+   * Create an ArrowBuf wrapping a buffer from this ArrowArray associated with the given BufferAllocator.
+   *
+   * <p>This method is "unsafe" because there is no validation of the given capacity or address. If the returned
+   * buffer is not freed, a memory leak will occur.
+   */
+  ArrowBuf unsafeAssociateAllocation(BufferAllocator trackingAllocator, long capacity, long memoryAddress) {
+    retain();
+    return trackingAllocator.wrapForeignAllocation(new ForeignAllocation(capacity, memoryAddress) {
+      @Override
+      protected void release0() {
+        ReferenceCountedArrowArray.this.release();
+      }
+    });
+  }
+}
diff --git a/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java
new file mode 100644
index 0000000000..2d31089ca7
--- /dev/null
+++ b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.ReferenceManager;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ArrowArrayUtilityTest {
+  BufferAllocator allocator;
+  ArrowArray arrowArray;
+  ReferenceCountedArrowArray dummyHandle;
+
+  @BeforeEach
+  void beforeEach() {
+    allocator = new RootAllocator();
+    arrowArray = ArrowArray.allocateNew(allocator);
+    dummyHandle = new ReferenceCountedArrowArray(arrowArray);
+  }
+
+  @AfterEach
+  void afterEach() {
+    dummyHandle.release();
+    allocator.close();
+  }
+
+  // ------------------------------------------------------------
+  // BufferImportTypeVisitor
+
+  @Test
+  void getBufferPtr() throws Exception {
+    // Note values are all dummy values here
+    try (BufferImportTypeVisitor visitor =
+        new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[]{0})) {
+
+      // Too few buffers
+      assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 1));
+
+      // Null where one isn't expected
+      assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 0));
+    }
+  }
+
+  @Test
+  void cleanupAfterFailure() throws Exception {
+    // Note values are all dummy values here
+    long address = MemoryUtil.UNSAFE.allocateMemory(16);
+    try (BufferImportTypeVisitor visitor =
+             new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[] {address})) {
+      // This fails, but only after we've already imported a buffer.
+      assertThrows(IllegalStateException.class, () -> visitor.visit(new ArrowType.Int(32, true)));
+    } finally {
+      MemoryUtil.UNSAFE.freeMemory(address);
+    }
+  }
+
+  @Test
+  void bufferAssociatedWithAllocator() throws Exception {
+    // Note values are all dummy values here
+    final long bufferSize = 16;
+    final long fieldLength = bufferSize / IntVector.TYPE_WIDTH;
+    long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize);
+    long baseline = allocator.getAllocatedMemory();
+    ArrowFieldNode fieldNode = new ArrowFieldNode(fieldLength, 0);
+    try (BufferImportTypeVisitor visitor =
+             new BufferImportTypeVisitor(allocator, dummyHandle, fieldNode, new long[] {0, address})) {
+      List<ArrowBuf> buffers = visitor.visit(new ArrowType.Int(32, true));
+      assertThat(buffers).hasSize(2);
+      assertThat(buffers.get(0)).isNull();
+      assertThat(buffers.get(1))
+          .isNotNull()
+          .extracting(ArrowBuf::getReferenceManager)
+          .extracting(ReferenceManager::getAllocator)
+          .isEqualTo(allocator);
+      assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline + bufferSize);
+    } finally {
+      MemoryUtil.UNSAFE.freeMemory(address);
+    }
+    assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline);
+  }
+
+  // ------------------------------------------------------------
+  // ReferenceCountedArrowArray
+
+  @Test
+  void releaseRetain() {
+    ArrowArray array = ArrowArray.allocateNew(allocator);
+    ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array);
+    assertThat(array.isClosed()).isFalse();
+    handle.retain();
+    assertThat(array.isClosed()).isFalse();
+    handle.release();
+    assertThat(array.isClosed()).isFalse();
+    handle.release();
+    assertThat(array.isClosed()).isTrue();
+
+    assertThrows(IllegalStateException.class, handle::release);
+    assertThrows(IllegalStateException.class, handle::retain);
+  }
+
+  @Test
+  void associate() {
+    final long bufferSize = 16;
+    final long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize);
+    try {
+      ArrowArray array = ArrowArray.allocateNew(allocator);
+      ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array);
+      assertThat(array.isClosed()).isFalse();
+      ArrowBuf buf = handle.unsafeAssociateAllocation(allocator, bufferSize, address);
+      assertThat(array.isClosed()).isFalse();
+      buf.close();
+      assertThat(array.isClosed()).isFalse();
+      handle.release();
+      assertThat(array.isClosed()).isTrue();
+    } finally {
+      MemoryUtil.UNSAFE.freeMemory(address);
+    }
+  }
+}
diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
index 8aa9f85c25..fc73df449b 100644
--- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
@@ -99,6 +99,7 @@ import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.TransferPair;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -106,14 +107,17 @@ import org.junit.jupiter.api.Test;
 public class RoundtripTest {
   private static final String EMPTY_SCHEMA_PATH = "";
   private RootAllocator allocator = null;
+  private BufferAllocator childAllocator = null;
 
   @BeforeEach
   public void setUp() {
     allocator = new RootAllocator(Long.MAX_VALUE);
+    childAllocator = allocator.newChildAllocator("child", 0, Long.MAX_VALUE);
   }
 
   @AfterEach
   public void tearDown() {
+    childAllocator.close();
     allocator.close();
   }
 
@@ -130,7 +134,15 @@ public class RoundtripTest {
       }
 
       // Consumer imports vector
-      return Data.importVector(allocator, consumerArrowArray, consumerArrowSchema, null);
+      FieldVector imported = Data.importVector(childAllocator, consumerArrowArray, consumerArrowSchema, null);
+      if (!(imported instanceof NullVector)) {
+        assertEquals(childAllocator, imported.getAllocator());
+      }
+
+      // Check that transfers work
+      TransferPair pair = imported.getTransferPair(allocator);
+      pair.transfer();
+      return (FieldVector) pair.getTo();
     }
   }
 
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
index 5f8ab12446..3071c02f30 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -17,48 +17,38 @@
 
 package org.apache.arrow.memory;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.arrow.util.Preconditions;
 
 /**
- * The abstract base class of AllocationManager.
+ * An AllocationManager is the implementation of a physical memory allocation.
  *
- * <p>Manages the relationship between one or more allocators and a particular UDLE. Ensures that
- * one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
- * associated allocators.
+ * <p>Manages the relationship between the allocators and a particular memory allocation. Ensures that
+ * one allocator owns the memory that multiple allocators may be referencing. Manages a BufferLedger between
+ * each of its associated allocators. It does not track the reference count; that is the role of {@link BufferLedger}
+ * (aka {@link ReferenceManager}).
  *
- * <p>The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
- * package which need access
- * to these objects or methods.
+ * <p>This is a public interface implemented by concrete allocator implementations (e.g. Netty or Unsafe).
  *
  * <p>Threading: AllocationManager manages thread-safety internally. Operations within the context
- * of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the
- * context of two ledgers
- * will acquire a lock on the AllocationManager instance. Important note, there is one
- * AllocationManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a
- * typical query. The
- * contention of acquiring a lock on AllocationManager should be very low.
+ * of a single BufferLedger are lockless in nature and can be leveraged by multiple threads. Operations that cross the
+ * context of two ledgers will acquire a lock on the AllocationManager instance. Important note, there is one
+ * AllocationManager per physical buffer allocation. As such, there will be thousands of these in a
+ * typical query. The contention of acquiring a lock on AllocationManager should be very low.
  */
 public abstract class AllocationManager {
-
-  private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
-
+  // The RootAllocator we are associated with. An allocation can only ever be associated with a single RootAllocator.
   private final BufferAllocator root;
-  private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
-  // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
-  // see JIRA for details
+  // An allocation can be tracked by multiple allocators. (This is because an allocator is more like a ledger.)
+  // All such allocators track reference counts individually, via BufferLedger instances. When an individual
+  // reference count reaches zero, the allocator will be dissociated from this allocation. If that was via the
+  // owningLedger, then no more allocators should be tracking this allocation, and the allocation will be freed.
+  // ARROW-1627: Trying to minimize memory overhead caused by previously used IdentityHashMap
   private final LowCostIdentityHashMap<BufferAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
-  private final long amCreationTime = System.nanoTime();
-
-  // The ReferenceManager created at the time of creation of this AllocationManager
-  // is treated as the owning reference manager for the underlying chunk of memory
-  // managed by this allocation manager
+  // The primary BufferLedger (i.e. reference count) tracking this allocation.
+  // This is mostly a semantic constraint on the API user: if the reference count reaches 0 in the owningLedger, then
+  // there are not supposed to be any references through other allocators. In practice, this doesn't do anything
+  // as the implementation just forces ownership to be transferred to one of the other extant references.
   private volatile BufferLedger owningLedger;
-  private volatile long amDestructionTime = 0;
 
   protected AllocationManager(BufferAllocator accountingAllocator) {
     Preconditions.checkNotNull(accountingAllocator);
@@ -81,7 +71,7 @@ public abstract class AllocationManager {
 
   /**
    * Associate the existing underlying buffer with a new allocator. This will increase the
-   * reference count on the corresponding buffer ledger by 1
+   * reference count on the corresponding buffer ledger by 1.
    *
    * @param allocator The target allocator to associate this buffer with.
    * @return The reference manager (new or existing) that associates the underlying
@@ -99,6 +89,7 @@ public abstract class AllocationManager {
     synchronized (this) {
       BufferLedger ledger = map.get(allocator);
       if (ledger != null) {
+        // We were already being tracked by the given allocator, just return it
         if (retain) {
           // bump the ref count for the ledger
           ledger.increment();
@@ -106,6 +97,7 @@ public abstract class AllocationManager {
         return ledger;
       }
 
+      // We weren't previously being tracked by the given allocator; create a new ledger
       ledger = new BufferLedger(allocator, this);
 
       if (retain) {
@@ -161,7 +153,6 @@ public abstract class AllocationManager {
         // free the memory chunk associated with the allocation manager
         release0();
         oldAllocator.getListener().onRelease(getSize());
-        amDestructionTime = System.nanoTime();
         owningLedger = null;
       } else {
         // since the refcount dropped to 0 for the owning reference manager and allocation
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index e59349c649..bb3816d9c4 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -235,4 +235,35 @@ public interface BufferAllocator extends AutoCloseable {
   default RoundingPolicy getRoundingPolicy() {
     return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY;
   }
+
+  /**
+   * EXPERIMENTAL: Wrap an allocation created outside this BufferAllocator.
+   *
+   * <p>This is useful to integrate allocations from native code into the same memory management framework as
+   * Java-allocated buffers, presenting users a consistent API. The created buffer will be tracked by this allocator
+   * and can be transferred like Java-allocated buffers.
+   *
+   * <p>The underlying allocation will be closed when all references to the buffer are released. If this method throws,
+   * the underlying allocation will also be closed.
+   *
+   * @param allocation The underlying allocation.
+   */
+  default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
+    try {
+      forceAllocate(allocation.getSize());
+      final AllocationManager manager = new ForeignAllocationManager(this, allocation);
+      final BufferLedger ledger = manager.associate(this);
+      final ArrowBuf buf =
+          new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(), allocation.memoryAddress());
+      buf.writerIndex(allocation.getSize());
+      return buf;
+    } catch (Throwable t) {
+      try {
+        allocation.release0();
+      } catch (Throwable e) {
+        t.addSuppressed(e);
+      }
+      throw t;
+    }
+  }
 }
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java
new file mode 100644
index 0000000000..c1b47382a3
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * EXPERIMENTAL: a memory allocation that does not come from a BufferAllocator, but rather an outside source (like JNI).
+ *
+ * <p>To use this, subclass this class and implement {@link #release0()} to free the allocation.
+ */
+public abstract class ForeignAllocation {
+  private final long memoryAddress;
+  private final long size;
+
+  /**
+   * Create a new AllocationManager representing an imported buffer.
+   *
+   * @param size The buffer size.
+   * @param memoryAddress The buffer address.
+   */
+  protected ForeignAllocation(long size, long memoryAddress) {
+    this.memoryAddress = memoryAddress;
+    this.size = size;
+  }
+
+  /**
+   * Get the size of this allocation.
+   */
+  public long getSize() {
+    return size;
+  }
+
+  /**
+   * Get the address of this allocation.
+   */
+  protected long memoryAddress() {
+    return memoryAddress;
+  }
+
+  /**
+   * Free this allocation. Will only be called once.
+   */
+  protected abstract void release0();
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java
new file mode 100644
index 0000000000..741b866f81
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * An AllocationManager wrapping a ForeignAllocation.
+ */
+class ForeignAllocationManager extends AllocationManager {
+  private final ForeignAllocation allocation;
+
+  protected ForeignAllocationManager(BufferAllocator accountingAllocator, ForeignAllocation allocation) {
+    super(accountingAllocator);
+    this.allocation = allocation;
+  }
+
+  @Override
+  public long getSize() {
+    return allocation.getSize();
+  }
+
+  @Override
+  protected long memoryAddress() {
+    return allocation.memoryAddress();
+  }
+
+  @Override
+  protected void release0() {
+    allocation.release0();
+  }
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
index 00ae274b74..7d4de18751 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
@@ -18,8 +18,10 @@
 package org.apache.arrow.memory;
 
 /**
- * Reference Manager manages one or more ArrowBufs that share the
- * reference count for the underlying memory chunk.
+ * ReferenceManager is the reference count for one or more allocations.
+ *
+ * <p>In order to integrate with the core {@link BufferAllocator} implementation, the allocation itself should
+ * be represented by an {@link AllocationManager}, though this is not required by the API.
  */
 public interface ReferenceManager {
 
@@ -70,6 +72,8 @@ public interface ReferenceManager {
    * the target allocator-reference manager combination + 1 in the case that the provided allocator
    * already had an association to this underlying memory.
    *
+   * <p>The underlying allocation ({@link AllocationManager}) will not be copied.
+   *
    * @param srcBuffer source ArrowBuf
    * @param targetAllocator The target allocator to create an association with.
    * @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf.
@@ -89,9 +93,10 @@ public interface ReferenceManager {
   ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length);
 
   /**
-   * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
-   * This will generate a new ArrowBuf that carries an association with the underlying memory
-   * for the given ArrowBuf
+   * Duplicate the memory accounting ownership of the backing allocation of the given ArrowBuf in another allocator.
+   * This will generate a new ArrowBuf that carries an association with the same underlying memory
+   * ({@link AllocationManager}s) as the given ArrowBuf.
+   *
    * @param sourceBuffer source ArrowBuf
    * @param targetAllocator The target allocator to create an association with
    * @return {@link OwnershipTransferResult} with info on transfer result and new buffer
diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
new file mode 100644
index 0000000000..5e40645e06
--- /dev/null
+++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestForeignAllocation {
+  BufferAllocator allocator;
+
+  @Before
+  public void before() {
+    allocator = new RootAllocator();
+  }
+
+  @After
+  public void after() {
+    allocator.close();
+  }
+
+  @Test
+  public void wrapForeignAllocation() {
+    final long bufferSize = 16;
+    UnsafeForeignAllocation allocation = new UnsafeForeignAllocation(bufferSize);
+    try {
+      assertEquals(0, allocator.getAllocatedMemory());
+      ArrowBuf buf = allocator.wrapForeignAllocation(allocation);
+      assertEquals(bufferSize, buf.capacity());
+      buf.close();
+      assertTrue(allocation.released);
+    } finally {
+      allocation.release0();
+    }
+    assertEquals(0, allocator.getAllocatedMemory());
+  }
+
+  private static class UnsafeForeignAllocation extends ForeignAllocation {
+    boolean released = false;
+
+    public UnsafeForeignAllocation(long bufferSize) {
+      super(bufferSize, MemoryUtil.UNSAFE.allocateMemory(bufferSize));
+    }
+
+    @Override
+    protected void release0() {
+      if (!released) {
+        MemoryUtil.UNSAFE.freeMemory(memoryAddress());
+        released = true;
+      }
+    }
+  }
+}