You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/11/23 16:27:23 UTC
[arrow] branch master updated: ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7a47e8dced ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)
7a47e8dced is described below
commit 7a47e8dcedd2f3198df90d68650a5e9b5597bb58
Author: David Li <li...@gmail.com>
AuthorDate: Wed Nov 23 11:27:16 2022 -0500
ARROW-16673: [Java] Integrate C Data into allocator hierarchy (#14506)
- Document some of the memory management internals
- Note that in practice, you have to use the concrete implementations of certain interfaces
- Begin integrating C Data into the full memory management stack instead of trying to patch it in at the edges
Authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
java/.gitignore | 2 -
.../java/org/apache/arrow/c/ArrayImporter.java | 57 ++--
.../main/java/org/apache/arrow/c/ArrowArray.java | 6 +
.../apache/arrow/c/BufferImportTypeVisitor.java | 322 +++++++++++++++++++++
.../org/apache/arrow/c/CDataReferenceManager.java | 124 --------
.../apache/arrow/c/ReferenceCountedArrowArray.java | 74 +++++
.../org/apache/arrow/c/ArrowArrayUtilityTest.java | 147 ++++++++++
.../java/org/apache/arrow/c/RoundtripTest.java | 14 +-
.../org/apache/arrow/memory/AllocationManager.java | 55 ++--
.../org/apache/arrow/memory/BufferAllocator.java | 31 ++
.../org/apache/arrow/memory/ForeignAllocation.java | 58 ++++
.../arrow/memory/ForeignAllocationManager.java | 45 +++
.../org/apache/arrow/memory/ReferenceManager.java | 15 +-
.../apache/arrow/memory/TestForeignAllocation.java | 72 +++++
14 files changed, 820 insertions(+), 202 deletions(-)
diff --git a/java/.gitignore b/java/.gitignore
index 59c2e7b2a0..07e84864a3 100644
--- a/java/.gitignore
+++ b/java/.gitignore
@@ -21,8 +21,6 @@ arrow-git.properties
cmake_install.cmake
install_manifest.txt
target/
-?/
-!/c/
# Generated properties file
flight/flight-sql-jdbc-driver/src/main/resources/properties/flight.properties
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
index 4da3806575..7132887dde 100644
--- a/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
@@ -22,13 +22,13 @@ import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
import static org.apache.arrow.util.Preconditions.checkNotNull;
import static org.apache.arrow.util.Preconditions.checkState;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -44,12 +44,12 @@ final class ArrayImporter {
private final FieldVector vector;
private final DictionaryProvider dictionaryProvider;
- private CDataReferenceManager referenceManager;
+ private ReferenceCountedArrowArray underlyingAllocation;
private int recursionLevel;
ArrayImporter(BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
- this.allocator = allocator;
- this.vector = vector;
+ this.allocator = Preconditions.checkNotNull(allocator);
+ this.vector = Preconditions.checkNotNull(vector);
this.dictionaryProvider = dictionaryProvider;
}
@@ -66,12 +66,11 @@ final class ArrayImporter {
recursionLevel = 0;
// This keeps the array alive as long as there are any buffers that need it
- referenceManager = new CDataReferenceManager(ownedArray);
+ underlyingAllocation = new ReferenceCountedArrowArray(ownedArray);
try {
- referenceManager.increment();
doImport(snapshot);
} finally {
- referenceManager.release();
+ underlyingAllocation.release();
}
}
@@ -81,9 +80,7 @@ final class ArrayImporter {
recursionLevel = parent.recursionLevel + 1;
checkState(recursionLevel <= MAX_IMPORT_RECURSION_LEVEL, "Recursion level in ArrowArray struct exceeded");
// Child buffers will keep the entire parent import alive.
- // Perhaps we can move the child structs on import,
- // but that is another level of complication.
- referenceManager = parent.referenceManager;
+ underlyingAllocation = parent.underlyingAllocation;
doImport(snapshot);
}
@@ -118,36 +115,20 @@ final class ArrayImporter {
// Import main data
ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
- List<ArrowBuf> buffers = importBuffers(snapshot);
- try {
+ long[] bufferPointers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
+
+ try (final BufferImportTypeVisitor visitor = new BufferImportTypeVisitor(
+ allocator, underlyingAllocation, fieldNode, bufferPointers)) {
+ final List<ArrowBuf> buffers;
+ if (bufferPointers == null || bufferPointers.length == 0) {
+ buffers = Collections.emptyList();
+ } else {
+ buffers = vector.getField().getType().accept(visitor);
+ }
vector.loadFieldBuffers(fieldNode, buffers);
- } catch (RuntimeException e) {
+ } catch (Exception e) {
throw new IllegalArgumentException(
"Could not load buffers for field " + vector.getField() + ". error message: " + e.getMessage(), e);
}
}
-
- private List<ArrowBuf> importBuffers(ArrowArray.Snapshot snapshot) {
- long[] buffers = NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
- if (buffers == null || buffers.length == 0) {
- return new ArrayList<>();
- }
-
- int buffersCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
- checkState(buffers.length == buffersCount, "Expected %s buffers for imported type %s, ArrowArray struct has %s",
- buffersCount, vector.getField().getType().getTypeID(), buffers.length);
-
- List<ArrowBuf> result = new ArrayList<>(buffersCount);
- for (long bufferPtr : buffers) {
- ArrowBuf buffer = null;
- if (bufferPtr != NULL) {
- // See ARROW-17720: [Java] C data interface: Add API to compute imported buffer size
- int capacity = Integer.MAX_VALUE;
- buffer = new ArrowBuf(referenceManager, null, capacity, bufferPtr);
- buffer.writerIndex(capacity);
- }
- result.add(buffer);
- }
- return result;
- }
}
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
index 99fe0432c1..a538852f47 100644
--- a/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrowArray.java
@@ -28,6 +28,7 @@ import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.VisibleForTesting;
/**
* C Data Interface ArrowArray.
@@ -149,6 +150,11 @@ public class ArrowArray implements BaseStruct {
}
}
+ @VisibleForTesting
+ boolean isClosed() {
+ return data == null;
+ }
+
private ByteBuffer directBuffer() {
return MemoryUtil.directBuffer(memoryAddress(), ArrowArray.SIZE_OF).order(ByteOrder.nativeOrder());
}
diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
new file mode 100644
index 0000000000..c8b6d07086
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.apache.arrow.c.NativeUtil.NULL;
+import static org.apache.arrow.util.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DurationVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntervalDayVector;
+import org.apache.arrow.vector.IntervalMonthDayNanoVector;
+import org.apache.arrow.vector.IntervalYearVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.DenseUnionVector;
+import org.apache.arrow.vector.complex.LargeListVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.DataSizeRoundingUtil;
+
+/**
+ * Import buffers from a C Data Interface struct.
+ */
+class BufferImportTypeVisitor implements ArrowType.ArrowTypeVisitor<List<ArrowBuf>>, AutoCloseable {
+ private final BufferAllocator allocator;
+ private final ReferenceCountedArrowArray underlyingAllocation;
+ private final ArrowFieldNode fieldNode;
+ private final long[] buffers;
+ private final List<ArrowBuf> imported;
+
+ BufferImportTypeVisitor(BufferAllocator allocator, ReferenceCountedArrowArray underlyingAllocation,
+ ArrowFieldNode fieldNode, long[] buffers) {
+ this.allocator = allocator;
+ this.underlyingAllocation = underlyingAllocation;
+ this.fieldNode = fieldNode;
+ this.buffers = buffers;
+ this.imported = new ArrayList<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(imported);
+ }
+
+ @VisibleForTesting
+ long getBufferPtr(ArrowType type, int index) {
+ checkState(
+ buffers.length > index,
+ "Expected at least %s buffers for type %s, but found %s", index + 1, type, buffers.length);
+ if (buffers[index] == NULL) {
+ throw new IllegalStateException(String.format("Buffer %s for type %s cannot be null", index, type));
+ }
+ return buffers[index];
+ }
+
+ private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) {
+ final long bufferPtr = getBufferPtr(type, index);
+ final long capacity = DataSizeRoundingUtil.divideBy8Ceil(bitsPerSlot * fieldNode.getLength());
+ ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+ this.imported.add(buf);
+ return buf;
+ }
+
+ private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) {
+ final long bufferPtr = getBufferPtr(type, index);
+ final long capacity = bytesPerSlot * fieldNode.getLength();
+ ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+ this.imported.add(buf);
+ return buf;
+ }
+
+ private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) {
+ final long bufferPtr = getBufferPtr(type, 1);
+ final long capacity = bytesPerSlot * (fieldNode.getLength() + 1);
+ ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+ this.imported.add(buf);
+ return buf;
+ }
+
+ private ArrowBuf importData(ArrowType type, long capacity) {
+ final long bufferPtr = getBufferPtr(type, 2);
+ ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
+ this.imported.add(buf);
+ return buf;
+ }
+
+ private ArrowBuf maybeImportBitmap(ArrowType type) {
+ checkState(
+ buffers.length > 0,
+ "Expected at least %s buffers for type %s, but found %s", 1, type, buffers.length);
+ if (buffers[0] == NULL) {
+ return null;
+ }
+ return importFixedBits(type, 0, /*bitsPerSlot=*/1);
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Null type) {
+ checkState(
+ buffers.length == 0,
+ "Expected %s buffers for type %s, but found %s", 0, type, buffers.length);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Struct type) {
+ return Collections.singletonList(maybeImportBitmap(type));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.List type) {
+ return Arrays.asList(maybeImportBitmap(type), importOffsets(type, ListVector.OFFSET_WIDTH));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.LargeList type) {
+ return Arrays.asList(maybeImportBitmap(type), importOffsets(type, LargeListVector.OFFSET_WIDTH));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.FixedSizeList type) {
+ return Collections.singletonList(maybeImportBitmap(type));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Union type) {
+ switch (type.getMode()) {
+ case Sparse:
+ return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH));
+ case Dense:
+ return Arrays.asList(importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH),
+ importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH));
+ default:
+ throw new UnsupportedOperationException("Importing buffers for type: " + type);
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Map type) {
+ return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Int type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth()));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.FloatingPoint type) {
+ switch (type.getPrecision()) {
+ case HALF:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, /*bytesPerSlot=*/2));
+ case SINGLE:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float4Vector.TYPE_WIDTH));
+ case DOUBLE:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, Float8Vector.TYPE_WIDTH));
+ default:
+ throw new UnsupportedOperationException("Importing buffers for type: " + type);
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Utf8 type) {
+ try (ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH)) {
+ final int start = offsets.getInt(0);
+ final int end = offsets.getInt(fieldNode.getLength() * (long) VarCharVector.OFFSET_WIDTH);
+ checkState(
+ end >= start,
+ "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+ final int len = end - start;
+ offsets.getReferenceManager().retain();
+ return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.LargeUtf8 type) {
+ try (ArrowBuf offsets = importOffsets(type, LargeVarCharVector.OFFSET_WIDTH)) {
+ final long start = offsets.getLong(0);
+ final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarCharVector.OFFSET_WIDTH);
+ checkState(
+ end >= start,
+ "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+ final long len = end - start;
+ offsets.getReferenceManager().retain();
+ return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Binary type) {
+ try (ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH)) {
+ final int start = offsets.getInt(0);
+ final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH);
+ checkState(
+ end >= start,
+ "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+ final int len = end - start;
+ offsets.getReferenceManager().retain();
+ return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.LargeBinary type) {
+ try (ArrowBuf offsets = importOffsets(type, LargeVarBinaryVector.OFFSET_WIDTH)) {
+ final long start = offsets.getLong(0);
+ // TODO: need better tests to cover the failure when I forget to multiply by offset width
+ final long end = offsets.getLong(fieldNode.getLength() * (long) LargeVarBinaryVector.OFFSET_WIDTH);
+ checkState(
+ end >= start,
+ "Offset buffer for type %s is malformed: start: %s, end: %s", type, start, end);
+ final long len = end - start;
+ offsets.getReferenceManager().retain();
+ return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.FixedSizeBinary type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, type.getByteWidth()));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Bool type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, /*bitsPerSlot=*/1));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Decimal type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth()));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Date type) {
+ switch (type.getUnit()) {
+ case DAY:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateDayVector.TYPE_WIDTH));
+ case MILLISECOND:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DateMilliVector.TYPE_WIDTH));
+ default:
+ throw new UnsupportedOperationException("Importing buffers for type: " + type);
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Time type) {
+ switch (type.getUnit()) {
+ case SECOND:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeSecVector.TYPE_WIDTH));
+ case MILLISECOND:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMilliVector.TYPE_WIDTH));
+ case MICROSECOND:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeMicroVector.TYPE_WIDTH));
+ case NANOSECOND:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeNanoVector.TYPE_WIDTH));
+ default:
+ throw new UnsupportedOperationException("Importing buffers for type: " + type);
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Timestamp type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, TimeStampVector.TYPE_WIDTH));
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Interval type) {
+ switch (type.getUnit()) {
+ case YEAR_MONTH:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalYearVector.TYPE_WIDTH));
+ case DAY_TIME:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalDayVector.TYPE_WIDTH));
+ case MONTH_DAY_NANO:
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, IntervalMonthDayNanoVector.TYPE_WIDTH));
+ default:
+ throw new UnsupportedOperationException("Importing buffers for type: " + type);
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> visit(ArrowType.Duration type) {
+ return Arrays.asList(maybeImportBitmap(type), importFixedBytes(type, 1, DurationVector.TYPE_WIDTH));
+ }
+}
diff --git a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java b/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java
deleted file mode 100644
index c5c2f97790..0000000000
--- a/java/c/src/main/java/org/apache/arrow/c/CDataReferenceManager.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.c;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OwnershipTransferResult;
-import org.apache.arrow.memory.ReferenceManager;
-import org.apache.arrow.util.Preconditions;
-
-/**
- * A ReferenceManager implementation that holds a
- * {@link org.apache.arrow.c.BaseStruct}.
- * <p>
- * A reference count is maintained and once it reaches zero the struct is
- * released (as per the C data interface specification) and closed.
- */
-final class CDataReferenceManager implements ReferenceManager {
- private final AtomicInteger bufRefCnt = new AtomicInteger(0);
-
- private final BaseStruct struct;
-
- CDataReferenceManager(BaseStruct struct) {
- this.struct = struct;
- }
-
- @Override
- public int getRefCount() {
- return bufRefCnt.get();
- }
-
- @Override
- public boolean release() {
- return release(1);
- }
-
- /**
- * Increment the reference count without any safety checks.
- */
- void increment() {
- bufRefCnt.incrementAndGet();
- }
-
- @Override
- public boolean release(int decrement) {
- Preconditions.checkState(decrement >= 1, "ref count decrement should be greater than or equal to 1");
- // decrement the ref count
- final int refCnt = bufRefCnt.addAndGet(-decrement);
- // the new ref count should be >= 0
- Preconditions.checkState(refCnt >= 0, "ref count has gone negative");
- if (refCnt == 0) {
- // refcount of this reference manager has dropped to 0
- // release the underlying memory
- struct.release();
- struct.close();
- }
- return refCnt == 0;
- }
-
- @Override
- public void retain() {
- retain(1);
- }
-
- @Override
- public void retain(int increment) {
- Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
- final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
- Preconditions.checkState(originalReferenceCount > 0, "retain called but memory was already released");
- }
-
- @Override
- public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
- retain();
-
- ArrowBuf targetArrowBuf = this.deriveBuffer(srcBuffer, 0, srcBuffer.capacity());
- targetArrowBuf.readerIndex(srcBuffer.readerIndex());
- targetArrowBuf.writerIndex(srcBuffer.writerIndex());
- return targetArrowBuf;
- }
-
- @Override
- public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) {
- final long derivedBufferAddress = sourceBuffer.memoryAddress() + index;
- return new ArrowBuf(this, null, length, derivedBufferAddress);
- }
-
- @Override
- public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BufferAllocator getAllocator() {
- return null;
- }
-
- @Override
- public long getSize() {
- return 0L;
- }
-
- @Override
- public long getAccountedSize() {
- return 0L;
- }
-}
diff --git a/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java
new file mode 100644
index 0000000000..f09f14817b
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/ReferenceCountedArrowArray.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.ForeignAllocation;
+
+/**
+ * The owner of an imported C Data Interface array.
+ *
+ * <p>There is a fundamental mismatch here between memory allocation schemes: AllocationManager represents a single
+ * allocation (= a single address and length). But an ArrowArray combines multiple allocations behind a single
+ * deallocation callback. This class bridges the two by tracking a reference count, so that the single callback
+ * can be managed by multiple {@link ForeignAllocation} instances.
+ */
+final class ReferenceCountedArrowArray {
+ private final ArrowArray array;
+ private final AtomicInteger refCnt;
+
+ ReferenceCountedArrowArray(ArrowArray array) {
+ this.array = array;
+ this.refCnt = new AtomicInteger(1);
+ }
+
+ void retain() {
+ if (refCnt.addAndGet(1) - 1 <= 0) {
+ throw new IllegalStateException("Tried to retain a released ArrowArray");
+ }
+ }
+
+ void release() {
+ int refcnt = refCnt.addAndGet(-1);
+ if (refcnt == 0) {
+ array.release();
+ array.close();
+ } else if (refcnt < 0) {
+ throw new IllegalStateException("Reference count went negative for imported ArrowArray");
+ }
+ }
+
+ /**
+ * Create an ArrowBuf wrapping a buffer from this ArrowArray associated with the given BufferAllocator.
+ *
+ * <p>This method is "unsafe" because there is no validation of the given capacity or address. If the returned
+ * buffer is not freed, a memory leak will occur.
+ */
+ ArrowBuf unsafeAssociateAllocation(BufferAllocator trackingAllocator, long capacity, long memoryAddress) {
+ retain();
+ return trackingAllocator.wrapForeignAllocation(new ForeignAllocation(capacity, memoryAddress) {
+ @Override
+ protected void release0() {
+ ReferenceCountedArrowArray.this.release();
+ }
+ });
+ }
+}
diff --git a/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java
new file mode 100644
index 0000000000..2d31089ca7
--- /dev/null
+++ b/java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.ReferenceManager;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ArrowArrayUtilityTest {
+ BufferAllocator allocator;
+ ArrowArray arrowArray;
+ ReferenceCountedArrowArray dummyHandle;
+
+ @BeforeEach
+ void beforeEach() {
+ allocator = new RootAllocator();
+ arrowArray = ArrowArray.allocateNew(allocator);
+ dummyHandle = new ReferenceCountedArrowArray(arrowArray);
+ }
+
+ @AfterEach
+ void afterEach() {
+ dummyHandle.release();
+ allocator.close();
+ }
+
+ // ------------------------------------------------------------
+ // BufferImportTypeVisitor
+
+ @Test
+ void getBufferPtr() throws Exception {
+ // Note values are all dummy values here
+ try (BufferImportTypeVisitor visitor =
+ new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[]{0})) {
+
+ // Too few buffers
+ assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 1));
+
+ // Null where one isn't expected
+ assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 0));
+ }
+ }
+
+ @Test
+ void cleanupAfterFailure() throws Exception {
+ // Note values are all dummy values here
+ long address = MemoryUtil.UNSAFE.allocateMemory(16);
+ try (BufferImportTypeVisitor visitor =
+ new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[] {address})) {
+ // This fails, but only after we've already imported a buffer.
+ assertThrows(IllegalStateException.class, () -> visitor.visit(new ArrowType.Int(32, true)));
+ } finally {
+ MemoryUtil.UNSAFE.freeMemory(address);
+ }
+ }
+
+ @Test
+ void bufferAssociatedWithAllocator() throws Exception {
+ // Note values are all dummy values here
+ final long bufferSize = 16;
+ final long fieldLength = bufferSize / IntVector.TYPE_WIDTH;
+ long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize);
+ long baseline = allocator.getAllocatedMemory();
+ ArrowFieldNode fieldNode = new ArrowFieldNode(fieldLength, 0);
+ try (BufferImportTypeVisitor visitor =
+ new BufferImportTypeVisitor(allocator, dummyHandle, fieldNode, new long[] {0, address})) {
+ List<ArrowBuf> buffers = visitor.visit(new ArrowType.Int(32, true));
+ assertThat(buffers).hasSize(2);
+ assertThat(buffers.get(0)).isNull();
+ assertThat(buffers.get(1))
+ .isNotNull()
+ .extracting(ArrowBuf::getReferenceManager)
+ .extracting(ReferenceManager::getAllocator)
+ .isEqualTo(allocator);
+ assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline + bufferSize);
+ } finally {
+ MemoryUtil.UNSAFE.freeMemory(address);
+ }
+ assertThat(allocator.getAllocatedMemory()).isEqualTo(baseline);
+ }
+
+ // ------------------------------------------------------------
+ // ReferenceCountedArrowArray
+
+ @Test
+ void releaseRetain() {
+ ArrowArray array = ArrowArray.allocateNew(allocator);
+ ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array);
+ assertThat(array.isClosed()).isFalse();
+ handle.retain();
+ assertThat(array.isClosed()).isFalse();
+ handle.release();
+ assertThat(array.isClosed()).isFalse();
+ handle.release();
+ assertThat(array.isClosed()).isTrue();
+
+ assertThrows(IllegalStateException.class, handle::release);
+ assertThrows(IllegalStateException.class, handle::retain);
+ }
+
+ @Test
+ void associate() {
+ final long bufferSize = 16;
+ final long address = MemoryUtil.UNSAFE.allocateMemory(bufferSize);
+ try {
+ ArrowArray array = ArrowArray.allocateNew(allocator);
+ ReferenceCountedArrowArray handle = new ReferenceCountedArrowArray(array);
+ assertThat(array.isClosed()).isFalse();
+ ArrowBuf buf = handle.unsafeAssociateAllocation(allocator, bufferSize, address);
+ assertThat(array.isClosed()).isFalse();
+ buf.close();
+ assertThat(array.isClosed()).isFalse();
+ handle.release();
+ assertThat(array.isClosed()).isTrue();
+ } finally {
+ MemoryUtil.UNSAFE.freeMemory(address);
+ }
+ }
+}
diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
index 8aa9f85c25..fc73df449b 100644
--- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
@@ -99,6 +99,7 @@ import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.TransferPair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -106,14 +107,17 @@ import org.junit.jupiter.api.Test;
public class RoundtripTest {
private static final String EMPTY_SCHEMA_PATH = "";
private RootAllocator allocator = null;
+ private BufferAllocator childAllocator = null;
@BeforeEach
public void setUp() {
allocator = new RootAllocator(Long.MAX_VALUE);
+ childAllocator = allocator.newChildAllocator("child", 0, Long.MAX_VALUE);
}
@AfterEach
public void tearDown() {
+ childAllocator.close();
allocator.close();
}
@@ -130,7 +134,15 @@ public class RoundtripTest {
}
// Consumer imports vector
- return Data.importVector(allocator, consumerArrowArray, consumerArrowSchema, null);
+ FieldVector imported = Data.importVector(childAllocator, consumerArrowArray, consumerArrowSchema, null);
+ if (!(imported instanceof NullVector)) {
+ assertEquals(childAllocator, imported.getAllocator());
+ }
+
+ // Check that transfers work
+ TransferPair pair = imported.getTransferPair(allocator);
+ pair.transfer();
+ return (FieldVector) pair.getTo();
}
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
index 5f8ab12446..3071c02f30 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -17,48 +17,38 @@
package org.apache.arrow.memory;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.arrow.util.Preconditions;
/**
- * The abstract base class of AllocationManager.
+ * An AllocationManager is the implementation of a physical memory allocation.
*
- * <p>Manages the relationship between one or more allocators and a particular UDLE. Ensures that
- * one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
- * associated allocators.
+ * <p>Manages the relationship between the allocators and a particular memory allocation. Ensures that
+ * one allocator owns the memory that multiple allocators may be referencing. Manages a BufferLedger between
+ * each of its associated allocators. It does not track the reference count; that is the role of {@link BufferLedger}
+ * (aka {@link ReferenceManager}).
*
- * <p>The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
- * package which need access
- * to these objects or methods.
+ * <p>This is a public interface implemented by concrete allocator implementations (e.g. Netty or Unsafe).
*
* <p>Threading: AllocationManager manages thread-safety internally. Operations within the context
- * of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the
- * context of two ledgers
- * will acquire a lock on the AllocationManager instance. Important note, there is one
- * AllocationManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a
- * typical query. The
- * contention of acquiring a lock on AllocationManager should be very low.
+ * of a single BufferLedger are lockless in nature and can be leveraged by multiple threads. Operations that cross the
+ * context of two ledgers will acquire a lock on the AllocationManager instance. Important note, there is one
+ * AllocationManager per physical buffer allocation. As such, there will be thousands of these in a
+ * typical query. The contention of acquiring a lock on AllocationManager should be very low.
*/
public abstract class AllocationManager {
-
- private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
-
+ // The RootAllocator we are associated with. An allocation can only ever be associated with a single RootAllocator.
private final BufferAllocator root;
- private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
- // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
- // see JIRA for details
+ // An allocation can be tracked by multiple allocators. (This is because an allocator is more like a ledger.)
+ // All such allocators track reference counts individually, via BufferLedger instances. When an individual
+ // reference count reaches zero, the allocator will be dissociated from this allocation. If that was via the
+ // owningLedger, then no more allocators should be tracking this allocation, and the allocation will be freed.
+ // ARROW-1627: Trying to minimize memory overhead caused by previously used IdentityHashMap
private final LowCostIdentityHashMap<BufferAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
- private final long amCreationTime = System.nanoTime();
-
- // The ReferenceManager created at the time of creation of this AllocationManager
- // is treated as the owning reference manager for the underlying chunk of memory
- // managed by this allocation manager
+ // The primary BufferLedger (i.e. reference count) tracking this allocation.
+ // This is mostly a semantic constraint on the API user: if the reference count reaches 0 in the owningLedger, then
+ // there are not supposed to be any references through other allocators. In practice, this doesn't do anything
+ // as the implementation just forces ownership to be transferred to one of the other extant references.
private volatile BufferLedger owningLedger;
- private volatile long amDestructionTime = 0;
protected AllocationManager(BufferAllocator accountingAllocator) {
Preconditions.checkNotNull(accountingAllocator);
@@ -81,7 +71,7 @@ public abstract class AllocationManager {
/**
* Associate the existing underlying buffer with a new allocator. This will increase the
- * reference count on the corresponding buffer ledger by 1
+ * reference count on the corresponding buffer ledger by 1.
*
* @param allocator The target allocator to associate this buffer with.
* @return The reference manager (new or existing) that associates the underlying
@@ -99,6 +89,7 @@ public abstract class AllocationManager {
synchronized (this) {
BufferLedger ledger = map.get(allocator);
if (ledger != null) {
+ // We were already being tracked by the given allocator, just return it
if (retain) {
// bump the ref count for the ledger
ledger.increment();
@@ -106,6 +97,7 @@ public abstract class AllocationManager {
return ledger;
}
+ // We weren't previously being tracked by the given allocator; create a new ledger
ledger = new BufferLedger(allocator, this);
if (retain) {
@@ -161,7 +153,6 @@ public abstract class AllocationManager {
// free the memory chunk associated with the allocation manager
release0();
oldAllocator.getListener().onRelease(getSize());
- amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// since the refcount dropped to 0 for the owning reference manager and allocation
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index e59349c649..bb3816d9c4 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -235,4 +235,35 @@ public interface BufferAllocator extends AutoCloseable {
default RoundingPolicy getRoundingPolicy() {
return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY;
}
+
+ /**
+ * EXPERIMENTAL: Wrap an allocation created outside this BufferAllocator.
+ *
+ * <p>This is useful to integrate allocations from native code into the same memory management framework as
+ * Java-allocated buffers, presenting users a consistent API. The created buffer will be tracked by this allocator
+ * and can be transferred like Java-allocated buffers.
+ *
+ * <p>The underlying allocation will be closed when all references to the buffer are released. If this method throws,
+ * the underlying allocation will also be closed.
+ *
+ * @param allocation The underlying allocation.
+ */
+ default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
+ try {
+ forceAllocate(allocation.getSize());
+ final AllocationManager manager = new ForeignAllocationManager(this, allocation);
+ final BufferLedger ledger = manager.associate(this);
+ final ArrowBuf buf =
+ new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(), allocation.memoryAddress());
+ buf.writerIndex(allocation.getSize());
+ return buf;
+ } catch (Throwable t) {
+ try {
+ allocation.release0();
+ } catch (Throwable e) {
+ t.addSuppressed(e);
+ }
+ throw t;
+ }
+ }
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java
new file mode 100644
index 0000000000..c1b47382a3
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * EXPERIMENTAL: a memory allocation that does not come from a BufferAllocator, but rather an outside source (like JNI).
+ *
+ * <p>To use this, subclass this class and implement {@link #release0()} to free the allocation.
+ */
+public abstract class ForeignAllocation {
+ private final long memoryAddress;
+ private final long size;
+
+ /**
+ * Create a new AllocationManager representing an imported buffer.
+ *
+ * @param size The buffer size.
+ * @param memoryAddress The buffer address.
+ */
+ protected ForeignAllocation(long size, long memoryAddress) {
+ this.memoryAddress = memoryAddress;
+ this.size = size;
+ }
+
+ /**
+ * Get the size of this allocation.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * Get the address of this allocation.
+ */
+ protected long memoryAddress() {
+ return memoryAddress;
+ }
+
+ /**
+ * Free this allocation. Will only be called once.
+ */
+ protected abstract void release0();
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java
new file mode 100644
index 0000000000..741b866f81
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ForeignAllocationManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * An AllocationManager wrapping a ForeignAllocation.
+ */
+class ForeignAllocationManager extends AllocationManager {
+ private final ForeignAllocation allocation;
+
+ protected ForeignAllocationManager(BufferAllocator accountingAllocator, ForeignAllocation allocation) {
+ super(accountingAllocator);
+ this.allocation = allocation;
+ }
+
+ @Override
+ public long getSize() {
+ return allocation.getSize();
+ }
+
+ @Override
+ protected long memoryAddress() {
+ return allocation.memoryAddress();
+ }
+
+ @Override
+ protected void release0() {
+ allocation.release0();
+ }
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
index 00ae274b74..7d4de18751 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
@@ -18,8 +18,10 @@
package org.apache.arrow.memory;
/**
- * Reference Manager manages one or more ArrowBufs that share the
- * reference count for the underlying memory chunk.
+ * ReferenceManager is the reference count for one or more allocations.
+ *
+ * <p>In order to integrate with the core {@link BufferAllocator} implementation, the allocation itself should
+ * be represented by an {@link AllocationManager}, though this is not required by the API.
*/
public interface ReferenceManager {
@@ -70,6 +72,8 @@ public interface ReferenceManager {
* the target allocator-reference manager combination + 1 in the case that the provided allocator
* already had an association to this underlying memory.
*
+ * <p>The underlying allocation ({@link AllocationManager}) will not be copied.
+ *
* @param srcBuffer source ArrowBuf
* @param targetAllocator The target allocator to create an association with.
* @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf.
@@ -89,9 +93,10 @@ public interface ReferenceManager {
ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length);
/**
- * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
- * This will generate a new ArrowBuf that carries an association with the underlying memory
- * for the given ArrowBuf
+ * Duplicate the memory accounting ownership of the backing allocation of the given ArrowBuf in another allocator.
+ * This will generate a new ArrowBuf that carries an association with the same underlying memory
+ * ({@link AllocationManager}s) as the given ArrowBuf.
+ *
* @param sourceBuffer source ArrowBuf
* @param targetAllocator The target allocator to create an association with
* @return {@link OwnershipTransferResult} with info on transfer result and new buffer
diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
new file mode 100644
index 0000000000..5e40645e06
--- /dev/null
+++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestForeignAllocation {
+ BufferAllocator allocator;
+
+ @Before
+ public void before() {
+ allocator = new RootAllocator();
+ }
+
+ @After
+ public void after() {
+ allocator.close();
+ }
+
+ @Test
+ public void wrapForeignAllocation() {
+ final long bufferSize = 16;
+ UnsafeForeignAllocation allocation = new UnsafeForeignAllocation(bufferSize);
+ try {
+ assertEquals(0, allocator.getAllocatedMemory());
+ ArrowBuf buf = allocator.wrapForeignAllocation(allocation);
+ assertEquals(bufferSize, buf.capacity());
+ buf.close();
+ assertTrue(allocation.released);
+ } finally {
+ allocation.release0();
+ }
+ assertEquals(0, allocator.getAllocatedMemory());
+ }
+
+ private static class UnsafeForeignAllocation extends ForeignAllocation {
+ boolean released = false;
+
+ public UnsafeForeignAllocation(long bufferSize) {
+ super(bufferSize, MemoryUtil.UNSAFE.allocateMemory(bufferSize));
+ }
+
+ @Override
+ protected void release0() {
+ if (!released) {
+ MemoryUtil.UNSAFE.freeMemory(memoryAddress());
+ released = true;
+ }
+ }
+ }
+}