You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ju...@apache.org on 2017/03/21 22:24:25 UTC
[3/3] arrow git commit: ARROW-208: Add checkstyle policy to java
project
ARROW-208: Add checkstyle policy to java project
Author: Tsuyoshi Ozawa <oz...@apache.org>
Closes #96 from oza/ARROW-208 and squashes the following commits:
809e729 [Tsuyoshi Ozawa] reformatted code in memory and tools dir with IDE
40ee6a3 [Tsuyoshi Ozawa] ARROW-208: Add checkstyle policy to java project
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a9a57013
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a9a57013
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a9a57013
Branch: refs/heads/master
Commit: a9a570139966593ed84ddd842da73b60ace89e1e
Parents: a8bf0fb
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Tue Mar 21 15:24:19 2017 -0700
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Tue Mar 21 15:24:19 2017 -0700
----------------------------------------------------------------------
.../src/main/java/io/netty/buffer/ArrowBuf.java | 219 ++++----
.../java/io/netty/buffer/ExpandableByteBuf.java | 8 +-
.../main/java/io/netty/buffer/LargeBuffer.java | 9 +-
.../io/netty/buffer/MutableWrappedByteBuf.java | 18 +-
.../netty/buffer/PooledByteBufAllocatorL.java | 84 +--
.../netty/buffer/UnsafeDirectLittleEndian.java | 52 +-
.../org/apache/arrow/memory/Accountant.java | 102 ++--
.../apache/arrow/memory/AllocationListener.java | 4 +-
.../apache/arrow/memory/AllocationManager.java | 177 +++---
.../arrow/memory/AllocationReservation.java | 20 +-
.../arrow/memory/AllocatorClosedException.java | 6 +-
.../arrow/memory/ArrowByteBufAllocator.java | 14 +-
.../org/apache/arrow/memory/BaseAllocator.java | 539 ++++++++++---------
.../org/apache/arrow/memory/BoundsChecking.java | 7 +-
.../apache/arrow/memory/BufferAllocator.java | 80 +--
.../org/apache/arrow/memory/BufferManager.java | 15 +-
.../org/apache/arrow/memory/ChildAllocator.java | 18 +-
.../arrow/memory/OutOfMemoryException.java | 13 +-
.../org/apache/arrow/memory/RootAllocator.java | 6 +-
.../org/apache/arrow/memory/package-info.java | 49 +-
.../apache/arrow/memory/util/AssertionUtil.java | 15 +-
.../arrow/memory/util/AutoCloseableLock.java | 5 +-
.../apache/arrow/memory/util/HistoricalLog.java | 85 +--
.../apache/arrow/memory/util/StackTrace.java | 15 +-
java/pom.xml | 55 ++
.../java/org/apache/arrow/tools/EchoServer.java | 102 ++--
.../org/apache/arrow/tools/FileRoundtrip.java | 29 +-
.../org/apache/arrow/tools/FileToStream.java | 17 +-
.../org/apache/arrow/tools/Integration.java | 133 ++---
.../org/apache/arrow/tools/StreamToFile.java | 17 +-
.../arrow/tools/ArrowFileTestFixtures.java | 28 +-
.../org/apache/arrow/tools/EchoServerTest.java | 66 ++-
.../apache/arrow/tools/TestFileRoundtrip.java | 15 +-
.../org/apache/arrow/tools/TestIntegration.java | 159 +++---
34 files changed, 1218 insertions(+), 963 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
index 95d2be5..e777b5a 100644
--- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -6,27 +6,21 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+
+import io.netty.util.internal.PlatformDependent;
import org.apache.arrow.memory.AllocationManager.BufferLedger;
import org.apache.arrow.memory.ArrowByteBufAllocator;
@@ -37,15 +31,23 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.BufferManager;
import org.apache.arrow.memory.util.HistoricalLog;
-import com.google.common.base.Preconditions;
-
-import io.netty.util.internal.PlatformDependent;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
+
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
private static final AtomicLong idGenerator = new AtomicLong(0);
-
+ private static final int LOG_BYTES_PER_ROW = 10;
private final long id = idGenerator.incrementAndGet();
private final AtomicInteger refCnt;
private final UnsafeDirectLittleEndian udle;
@@ -55,9 +57,9 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
private final BufferManager bufManager;
private final ArrowByteBufAllocator alloc;
private final boolean isEmpty;
- private volatile int length;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null;
+ private volatile int length;
public ArrowBuf(
final AtomicInteger refCnt,
@@ -85,6 +87,17 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
+ public static String bufferState(final ByteBuf buf) {
+ final int cap = buf.capacity();
+ final int mcap = buf.maxCapacity();
+ final int ri = buf.readerIndex();
+ final int rb = buf.readableBytes();
+ final int wi = buf.writerIndex();
+ final int wb = buf.writableBytes();
+ return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d",
+ cap, mcap, ri, rb, wi, wb);
+ }
+
public ArrowBuf reallocIfNeeded(final int size) {
Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
@@ -95,7 +108,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
if (bufManager != null) {
return bufManager.replace(this, size);
} else {
- throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
+ throw new UnsupportedOperationException("Realloc is only available in the context of an " +
+ "operator's UDFs");
}
}
@@ -128,14 +142,13 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
/**
* Allows a function to determine whether not reading a particular string of bytes is valid.
- *
- * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that
+ * <p>
+ * Will throw an exception if the memory is not readable for some reason. Only doesn't
+ * something in the case that
* AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
*
- * @param start
- * The starting position of the bytes to be read.
- * @param end
- * The exclusive endpoint of the bytes to be read.
+ * @param start The starting position of the bytes to be read.
+ * @param end The exclusive endpoint of the bytes to be read.
*/
public void checkBytes(int start, int end) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
@@ -156,17 +169,21 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
/**
- * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of memory ownership and
- * accounting. This has no impact on the reference counting for the current ArrowBuf except in the situation where the
+ * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
+ * memory ownership and
+ * accounting. This has no impact on the reference counting for the current ArrowBuf except in
+ * the situation where the
* passed in Allocator is the same as the current buffer.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a
- * reference count of 1 (in the case that this is the first time this memory is being associated with the new
- * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination
+ * <p>
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a
+ * reference count of 1 (in the case that this is the first time this memory is being
+ * associated with the new
+ * allocator) or the current value of the reference count + 1 for the other
+ * AllocationManager/BufferLedger combination
* in the case that the provided allocator already had an association to this underlying memory.
*
- * @param target
- * The target allocator to create an association with.
+ * @param target The target allocator to create an association with.
* @return A new ArrowBuf which shares the same underlying memory as this ArrowBuf.
*/
public ArrowBuf retain(BufferAllocator target) {
@@ -186,28 +203,39 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
/**
- * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will generate a new ArrowBuf
- * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is connected to the
- * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If
- * this ArrowBuf does not currently own the memory underlying it (and is only associated with it), this does not
+ * Transfer the memory accounting ownership of this ArrowBuf to another allocator. This will
+ * generate a new ArrowBuf
+ * that carries an association with the underlying memory of this ArrowBuf. If this ArrowBuf is
+ * connected to the
+ * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to
+ * the taret allocator. If
+ * this ArrowBuf does not currently own the memory underlying it (and is only associated with
+ * it), this does not
* transfer any ownership to the newly created ArrowBuf.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created ArrowBuf with either have a
- * reference count of 1 (in the case that this is the first time this memory is being associated with the new
- * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
+ * <p>
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a
+ * reference count of 1 (in the case that this is the first time this memory is being
+ * associated with the new
+ * allocator) or the current value of the reference count for the other
+ * AllocationManager/BufferLedger combination in
* the case that the provided allocator already had an association to this underlying memory.
- *
- * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
- * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
- * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
- * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
- * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
+ * <p>
+ * Transfers will always succeed, even if that puts the other allocator into an overlimit
+ * situation. This is possible
+ * due to the fact that the original owning allocator may have allocated this memory out of a
+ * local reservation
+ * whereas the target allocator may need to allocate new memory from a parent or RootAllocator.
+ * This operation is done
+ * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could
+ * occur slightly prematurely
+ * to an actual overlimit==true condition. This is simply conservative behavior which means we
+ * may return overlimit
* slightly sooner than is necessary.
*
- * @param target
- * The allocator to transfer ownership to.
- * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
- * created ArrowBuf.
+ * @param target The allocator to transfer ownership to.
+ * @return A new transfer result with the impact of the transfer (whether it was overlimit) as
+ * well as the newly created ArrowBuf.
*/
public TransferResult transferOwnership(BufferAllocator target) {
@@ -223,28 +251,6 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
return new TransferResult(allocationFit, newBuf);
}
- /**
- * The outcome of a Transfer.
- */
- public class TransferResult {
-
- /**
- * Whether this transfer fit within the target allocator's capacity.
- */
- public final boolean allocationFit;
-
- /**
- * The newly created buffer associated with the target allocator.
- */
- public final ArrowBuf buffer;
-
- private TransferResult(boolean allocationFit, ArrowBuf buffer) {
- this.allocationFit = allocationFit;
- this.buffer = buffer;
- }
-
- }
-
@Override
public boolean release() {
return release(1);
@@ -261,7 +267,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
if (decrement < 1) {
- throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s",
+ throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer" +
+ " Info: %s",
decrement, toVerboseString()));
}
@@ -273,7 +280,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
if (refCnt < 0) {
throw new IllegalStateException(
- String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
+ String.format("ArrowBuf[%d] refCnt has gone negative. Buffer Info: %s", id,
+ toVerboseString()));
}
return refCnt == 0;
@@ -299,7 +307,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
return this;
}
- throw new UnsupportedOperationException("Buffers don't support resizing that increases the size.");
+ throw new UnsupportedOperationException("Buffers don't support resizing that increases the " +
+ "size.");
}
@Override
@@ -354,17 +363,6 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
return slice(readerIndex(), readableBytes());
}
- public static String bufferState(final ByteBuf buf) {
- final int cap = buf.capacity();
- final int mcap = buf.maxCapacity();
- final int ri = buf.readerIndex();
- final int rb = buf.readableBytes();
- final int wi = buf.writerIndex();
- final int wb = buf.writableBytes();
- return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d",
- cap, mcap, ri, rb, wi, wb);
- }
-
@Override
public ArrowBuf slice(int index, int length) {
@@ -373,7 +371,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
/*
- * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which
+ * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects
+ * .html#wiki-h3-5, which
* explains that derived buffers share their reference count with their parent
*/
final ArrowBuf newBuf = ledger.newArrowBuf(offset + index, length);
@@ -408,12 +407,12 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuffer[] nioBuffers() {
- return new ByteBuffer[] { nioBuffer() };
+ return new ByteBuffer[]{nioBuffer()};
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
- return new ByteBuffer[] { nioBuffer(index, length) };
+ return new ByteBuffer[]{nioBuffer(index, length)};
}
@Override
@@ -443,7 +442,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public String toString() {
- return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset + capacity());
+ return String.format("ArrowBuf[%d], udle: [%d %d..%d]", id, udle.id, offset, offset +
+ capacity());
}
@Override
@@ -738,7 +738,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
if (src.isDirect()) {
checkIndex(index, length);
- PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
+ PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this
+ .memoryAddress() + index,
length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
@@ -788,7 +789,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
/**
- * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not shared, connected to larger
+ * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not
+ * shared, connected to larger
* underlying buffer of allocated memory)
*
* @return Size in bytes.
@@ -798,7 +800,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
/**
- * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the
+ * Return that is Accounted for by this buffer (and its potentially shared siblings within the
+ * context of the
* associated allocator).
*
* @return Size in bytes.
@@ -807,15 +810,11 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
return ledger.getAccountedSize();
}
- private final static int LOG_BYTES_PER_ROW = 10;
-
/**
* Return the buffer's byte contents in the form of a hex dump.
*
- * @param start
- * the starting byte index
- * @param length
- * how many bytes to log
+ * @param start the starting byte index
+ * @param length how many bytes to log
* @return A hex dump in a String.
*/
public String toHexString(final int start, final int length) {
@@ -878,5 +877,27 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
return this;
}
+ /**
+ * The outcome of a Transfer.
+ */
+ public class TransferResult {
+
+ /**
+ * Whether this transfer fit within the target allocator's capacity.
+ */
+ public final boolean allocationFit;
+
+ /**
+ * The newly created buffer associated with the target allocator.
+ */
+ public final ArrowBuf buffer;
+
+ private TransferResult(boolean allocationFit, ArrowBuf buffer) {
+ this.allocationFit = allocationFit;
+ this.buffer = buffer;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
index 7fb884d..9f8af93 100644
--- a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java
@@ -6,21 +6,23 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
import org.apache.arrow.memory.BufferAllocator;
/**
- * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of the Netty framework
+ * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of
+ * the Netty framework
* (thus supporting RPC level memory accounting).
*/
public class ExpandableByteBuf extends MutableWrappedByteBuf {
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
index c026e43..9a6e402 100644
--- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
+++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -6,21 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
/**
- * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
+ * counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {
+
public LargeBuffer(ByteBuf buffer) {
super(buffer);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
index 5709473..a5683ad 100644
--- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
import java.io.IOException;
@@ -26,16 +27,12 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
/**
- * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override
+ * some behaviors and make
* buffer mutable.
*/
abstract class MutableWrappedByteBuf extends AbstractByteBuf {
- @Override
- public ByteBuffer nioBuffer(int index, int length) {
- return unwrap().nioBuffer(index, length);
- }
-
ByteBuf buffer;
public MutableWrappedByteBuf(ByteBuf buffer) {
@@ -51,6 +48,11 @@ abstract class MutableWrappedByteBuf extends AbstractByteBuf {
}
@Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return unwrap().nioBuffer(index, length);
+ }
+
+ @Override
public ByteBuf unwrap() {
return buffer;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index a843ac5..b6de2e3 100644
--- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -6,42 +6,44 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.netty.buffer;
-import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+import io.netty.util.internal.StringUtil;
+
+import org.apache.arrow.memory.OutOfMemoryException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.arrow.memory.OutOfMemoryException;
-
-import io.netty.util.internal.StringUtil;
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
/**
- * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
- private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
- private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" +
+ ".allocator");
+ private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ public final UnsafeDirectLittleEndian empty;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);
-
private final InnerAllocator allocator;
- public final UnsafeDirectLittleEndian empty;
public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
@@ -78,6 +80,7 @@ public class PooledByteBufAllocatorL {
}
private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;
@@ -89,7 +92,8 @@ public class PooledByteBufAllocatorL {
this.size = size;
}
- private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
+ private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+ AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
@@ -119,6 +123,7 @@ public class PooledByteBufAllocatorL {
}
private class InnerAllocator extends PooledByteBufAllocator {
+
private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
private final int chunkSize;
@@ -131,7 +136,8 @@ public class PooledByteBufAllocatorL {
f.setAccessible(true);
this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
} catch (Exception e) {
- throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ throw new RuntimeException("Failure while initializing allocator. Unable to retrieve " +
+ "direct arenas field.", e);
}
this.chunkSize = directArenas[0].chunkSize;
@@ -158,7 +164,8 @@ public class PooledByteBufAllocatorL {
hugeBufferCount.incrementAndGet();
// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
- return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
+ return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
+ hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
@@ -173,7 +180,8 @@ public class PooledByteBufAllocatorL {
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();
- return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
+ return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
+ normalBufferCount, normalBufferSize);
}
} else {
@@ -183,7 +191,8 @@ public class PooledByteBufAllocatorL {
private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
- "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
+ "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
+ "didn't provide that functionality.");
}
@Override
@@ -203,7 +212,8 @@ public class PooledByteBufAllocatorL {
private void validate(int initialCapacity, int maxCapacity) {
if (initialCapacity < 0) {
- throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
+ throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: " +
+ "0+)");
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
@@ -212,26 +222,6 @@ public class PooledByteBufAllocatorL {
}
}
- private class MemoryStatusThread extends Thread {
-
- public MemoryStatusThread() {
- super("allocation.logger");
- this.setDaemon(true);
- }
-
- @Override
- public void run() {
- while (true) {
- memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
- try {
- Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
- }
-
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
@@ -256,6 +246,26 @@ public class PooledByteBufAllocatorL {
return buf.toString();
}
+ private class MemoryStatusThread extends Thread {
+
+ public MemoryStatusThread() {
+ super("allocation.logger");
+ this.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 5ea1767..87d822f 100644
--- a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,22 +18,31 @@
package io.netty.buffer;
+import io.netty.util.internal.PlatformDependent;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicLong;
-import io.netty.util.internal.PlatformDependent;
-
/**
- * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
* Netty classes and underlying Netty memory management.
*/
public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+
+ public static final boolean ASSERT_ENABLED;
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ static {
+ boolean isAssertEnabled = false;
+ assert isAssertEnabled = true;
+ ASSERT_ENABLED = isAssertEnabled;
+ }
+
public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;
@@ -60,21 +69,22 @@ public class UnsafeDirectLittleEndian extends WrappedByteBuf {
this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
- private long addr(int index) {
- return memoryAddress + index;
- }
- @Override
- public long getLong(int index) {
+ private long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ public long getLong(int index) {
// wrapped.checkIndex(index, 8);
- long v = PlatformDependent.getLong(addr(index));
- return v;
- }
+ long v = PlatformDependent.getLong(addr(index));
+ return v;
+ }
- @Override
- public float getFloat(int index) {
- return Float.intBitsToFloat(getInt(index));
- }
+ @Override
+ public float getFloat(int index) {
+ return Float.intBitsToFloat(getInt(index));
+ }
@Override
public ByteBuf slice() {
@@ -259,12 +269,4 @@ public class UnsafeDirectLittleEndian extends WrappedByteBuf {
return System.identityHashCode(this);
}
- public static final boolean ASSERT_ENABLED;
-
- static {
- boolean isAssertEnabled = false;
- assert isAssertEnabled = true;
- ASSERT_ENABLED = isAssertEnabled;
- }
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
index 37c598a..6ddc8f7 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
@@ -6,30 +6,33 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
+import com.google.common.base.Preconditions;
+
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
-import com.google.common.base.Preconditions;
-
/**
- * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All
+ * Provides a concurrent way to manage account for memory usage without locking. Used as basis
+ * for Allocators. All
* operations are threadsafe (except for close).
*/
@ThreadSafe
class Accountant implements AutoCloseable {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant
+ // .class);
/**
* The parent allocator
@@ -37,7 +40,8 @@ class Accountant implements AutoCloseable {
protected final Accountant parent;
/**
- * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the
+ * The amount of memory reserved for this allocator. Releases below this amount of memory will
+ * not be returned to the
* parent Accountant until this Accountant is closed.
*/
protected final long reservation;
@@ -45,7 +49,8 @@ class Accountant implements AutoCloseable {
private final AtomicLong peakAllocation = new AtomicLong();
/**
- * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to
+ * Maximum local memory that can be held. This can be externally updated. Changing it won't
+ * cause past memory to
* change but will change responses to future allocation efforts
*/
private final AtomicLong allocationLimit = new AtomicLong();
@@ -56,11 +61,14 @@ class Accountant implements AutoCloseable {
private final AtomicLong locallyHeldMemory = new AtomicLong();
public Accountant(Accountant parent, long reservation, long maxAllocation) {
- Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
- Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
+ Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be " +
+ "non-negative.");
+ Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be " +
+ "non-negative.");
Preconditions.checkArgument(reservation <= maxAllocation,
"The initial reservation size must be <= the maximum allocation.");
- Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
+ Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't " +
+ "reserve memory.");
this.parent = parent;
this.reservation = reservation;
@@ -72,19 +80,20 @@ class Accountant implements AutoCloseable {
if (!outcome.isOk()) {
throw new OutOfMemoryException(String.format(
"Failure trying to allocate initial reservation for Allocator. "
- + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name()));
+ + "Attempted to allocate %d bytes and received an outcome of %s.", reservation,
+ outcome.name()));
}
}
}
/**
- * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a
+ * Attempt to allocate the requested amount of memory. Either completely succeeds or completely
+ * fails. Constructs a a
* log of delta
- *
+ * <p>
* If it fails, no changes are made to accounting.
*
- * @param size
- * The amount of memory to reserve in bytes.
+ * @param size The amount of memory to reserve in bytes.
* @return True if the allocation was successful, false if the allocation failed.
*/
AllocationOutcome allocateBytes(long size) {
@@ -116,8 +125,7 @@ class Accountant implements AutoCloseable {
/**
* Increase the accounting. Returns whether the allocation fit within limits.
*
- * @param size
- * to increase
+ * @param size to increase
* @return Whether the allocation fit within limits.
*/
boolean forceAllocate(long size) {
@@ -126,24 +134,29 @@ class Accountant implements AutoCloseable {
}
/**
- * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation
- * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence
- * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at
- * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to
- * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know
- * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force
- * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur
+ * Internal method for allocation. This takes a forced approach to allocation to ensure that we
+ * manage reservation
+ * boundary issues consistently. Allocation is always done through the entire tree. The two
+ * options that we influence
+ * are whether the allocation should be forced and whether or not the peak memory allocation
+ * should be updated. If at
+ * some point during allocation escalation we determine that the allocation is no longer
+ * possible, we will continue to
+ * do a complete and consistent allocation but we will stop updating the peak allocation. We do
+ * this because we know
+ * that we will be directly unwinding this allocation (and thus never actually making the
+ * allocation). If force
+ * allocation is passed, then we continue to update the peak limits since we now know that this
+ * allocation will occur
* despite our moving past one or more limits.
*
- * @param size
- * The size of the allocation.
- * @param incomingUpdatePeak
- * Whether we should update the local peak for this allocation.
- * @param forceAllocation
- * Whether we should force the allocation.
+ * @param size The size of the allocation.
+ * @param incomingUpdatePeak Whether we should update the local peak for this allocation.
+ * @param forceAllocation Whether we should force the allocation.
* @return The outcome of the allocation.
*/
- private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) {
+ private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final
+ boolean forceAllocation) {
final long newLocal = locallyHeldMemory.addAndGet(size);
final long beyondReservation = newLocal - reservation;
final boolean beyondLimit = newLocal > allocationLimit.get();
@@ -173,7 +186,7 @@ class Accountant implements AutoCloseable {
Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
final long originalSize = newSize + size;
- if(originalSize > reservation && parent != null){
+ if (originalSize > reservation && parent != null) {
// we deallocated memory that we should release to our parent.
final long possibleAmountToReleaseToParent = originalSize - reservation;
final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
@@ -182,16 +195,6 @@ class Accountant implements AutoCloseable {
}
- /**
- * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation.
- *
- * @param newLimit
- * The limit in bytes.
- */
- public void setLimit(long newLimit) {
- allocationLimit.set(newLimit);
- }
-
public boolean isOverLimit() {
return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
}
@@ -216,7 +219,18 @@ class Accountant implements AutoCloseable {
}
/**
- * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not
+ * Set the maximum amount of memory that can be allocated in the this Accountant before failing
+ * an allocation.
+ *
+ * @param newLimit The limit in bytes.
+ */
+ public void setLimit(long newLimit) {
+ allocationLimit.set(newLimit);
+ }
+
+ /**
+ * Return the current amount of allocated memory that this Accountant is managing accounting
+ * for. Note this does not
* include reservation memory that hasn't been allocated.
*
* @return Currently allocate memory in bytes.
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
index 1b127f8..d36cb37 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
/**
* An allocation listener being notified for allocation/deallocation
- *
+ * <p>
* It is expected to be called from multiple threads and as such,
* provider should take care of making the implementation thread-safe
*/
public interface AllocationListener {
+
public static final AllocationListener NOOP = new AllocationListener() {
@Override
public void onAllocation(long size) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index f15bb8a..683752e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -6,53 +6,62 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
-import static org.apache.arrow.memory.BaseAllocator.indent;
+import com.google.common.base.Preconditions;
-import java.util.IdentityHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.AutoCloseableLock;
import org.apache.arrow.memory.util.HistoricalLog;
-import com.google.common.base.Preconditions;
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
+import static org.apache.arrow.memory.BaseAllocator.indent;
/**
- * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
- * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that
+ * one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
+ * associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the
+ * Netty-based
* PooledByteBufAllocatorL.
- *
- * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's package which need access
+ * <p>
+ * The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
+ * package which need access
* to these objects or methods.
- *
- * Threading: AllocationManager manages thread-safety internally. Operations within the context of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
- * will acquire a lock on the AllocationManager instance. Important note, there is one AllocationManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ * <p>
+ * Threading: AllocationManager manages thread-safety internally. Operations within the context
+ * of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the
+ * context of two ledgers
+ * will acquire a lock on the AllocationManager instance. Important note, there is one
+ * AllocationManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a
+ * typical query. The
* contention of acquiring a lock on AllocationManager should be very low.
- *
*/
public class AllocationManager {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationManager.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger
+ // (AllocationManager.class);
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
@@ -81,17 +90,19 @@ public class AllocationManager {
this.root = accountingAllocator.root;
this.underlying = INNER_ALLOCATOR.allocate(size);
- // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a
+ // we do a no retain association since our creator will want to retrieve the newly created
+ // ledger and will create a
// reference count at that point
this.owningLedger = associate(accountingAllocator, false);
this.size = underlying.capacity();
}
/**
- * Associate the existing underlying buffer with a new allocator. This will increase the reference count to the
+ * Associate the existing underlying buffer with a new allocator. This will increase the
+ * reference count to the
* provided ledger by 1.
- * @param allocator
- * The target allocator to associate this buffer with.
+ *
+ * @param allocator The target allocator to associate this buffer with.
* @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
*/
BufferLedger associate(final BaseAllocator allocator) {
@@ -118,7 +129,8 @@ public class AllocationManager {
}
try (AutoCloseableLock write = writeLock.open()) {
- // we have to recheck existing ledger since a second reader => writer could be competing with us.
+ // we have to recheck existing ledger since a second reader => writer could be competing
+ // with us.
final BufferLedger existingLedger = map.get(allocator);
if (existingLedger != null) {
@@ -141,7 +153,8 @@ public class AllocationManager {
/**
- * The way that a particular BufferLedger communicates back to the AllocationManager that it now longer needs to hold
+ * The way that a particular BufferLedger communicates back to the AllocationManager that it
+ * now longer needs to hold
* a reference to particular piece of memory.
*/
private class ReleaseListener {
@@ -169,16 +182,19 @@ public class AllocationManager {
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
- // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+ // we need to change the owning allocator. we've been removed so we'll get whatever is
+ // top of list
BufferLedger newLedger = map.values().iterator().next();
- // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+ // we'll forcefully transfer the ownership and not worry about whether we exceeded the
+ // limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
- throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger.");
+ throw new IllegalStateException("The final removal of a ledger should be connected to " +
+ "the owning ledger.");
}
}
@@ -187,25 +203,30 @@ public class AllocationManager {
}
/**
- * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+ * The reference manager that binds an allocator manager to a particular BaseAllocator. Also
+ * responsible for creating
* a set of ArrowBufs that share a common fate and set of reference counts.
- * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io.netty.buffer package.
+ * As with AllocationManager, the only reason this is public is due to ArrowBuf being in io
+ * .netty.buffer package.
*/
public class BufferLedger {
private final IdentityHashMap<ArrowBuf, Object> buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<ArrowBuf, Object>() : null;
- private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
- private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
- // correctly
+ private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to
+ // each ledger
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
+ // manage request for retain
+ // correctly
private final long lCreationTime = System.nanoTime();
- private volatile long lDestructionTime = 0;
private final BaseAllocator allocator;
private final ReleaseListener listener;
- private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
- "BufferLedger[%d]", 1)
+ private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
+ (BaseAllocator.DEBUG_LOG_LENGTH,
+ "BufferLedger[%d]", 1)
: null;
+ private volatile long lDestructionTime = 0;
private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
this.allocator = allocator;
@@ -213,10 +234,11 @@ public class AllocationManager {
}
/**
- * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
+ * Transfer any balance the current ledger has to the target ledger. In the case that the
+ * current ledger holds no
* memory, no transfer is made to the new ledger.
- * @param target
- * The ledger to transfer ownership account to.
+ *
+ * @param target The ledger to transfer ownership account to.
* @return Whether transfer fit within target ledgers limits.
*/
public boolean transferBalance(final BufferLedger target) {
@@ -231,7 +253,8 @@ public class AllocationManager {
return true;
}
- // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
+ // since two balance transfers out from the allocator manager could cause incorrect
+ // accounting, we need to ensure
// that this won't happen by synchronizing on the allocator manager instance.
try (AutoCloseableLock write = writeLock.open()) {
if (owningLedger != this) {
@@ -253,12 +276,10 @@ public class AllocationManager {
/**
* Print the current ledger state to a the provided StringBuilder.
- * @param sb
- * The StringBuilder to populate.
- * @param indent
- * The level of indentation to position the data.
- * @param verbosity
- * The level of verbosity to print.
+ *
+ * @param sb The StringBuilder to populate.
+ * @param indent The level of indentation to position the data.
+ * @param verbosity The level of verbosity to print.
*/
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
indent(sb, indent)
@@ -304,7 +325,8 @@ public class AllocationManager {
}
/**
- * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger should release its
+ * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger
+ * should release its
* ownership back to the AllocationManager
*/
public int decrement(int decrement) {
@@ -323,15 +345,19 @@ public class AllocationManager {
}
/**
- * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
- * ledger associated with this AllocationManager, a new one is created. This is placed on BufferLedger rather than
- * AllocationManager directly because ArrowBufs don't have access to AllocationManager and they are the ones
- * responsible for exposing the ability to associate multiple allocators with a particular piece of underlying
- * memory. Note that this will increment the reference count of this ledger by one to ensure the ledger isn't
+ * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator
+ * doesn't currently have a
+ * ledger associated with this AllocationManager, a new one is created. This is placed on
+ * BufferLedger rather than
+ * AllocationManager directly because ArrowBufs don't have access to AllocationManager and
+ * they are the ones
+ * responsible for exposing the ability to associate multiple allocators with a particular
+ * piece of underlying
+ * memory. Note that this will increment the reference count of this ledger by one to ensure
+ * the ledger isn't
* destroyed before use.
*
- * @param allocator
- * A BufferAllocator.
+ * @param allocator A BufferAllocator.
* @return The ledger associated with the BufferAllocator.
*/
public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
@@ -339,13 +365,14 @@ public class AllocationManager {
}
/**
- * Create a new ArrowBuf associated with this AllocationManager and memory. Does not impact reference count.
+ * Create a new ArrowBuf associated with this AllocationManager and memory. Does not impact
+ * reference count.
* Typically used for slicing.
- * @param offset
- * The offset in bytes to start this new ArrowBuf.
- * @param length
- * The length in bytes that this ArrowBuf will provide access to.
- * @return A new ArrowBuf that shares references with all ArrowBufs associated with this BufferLedger
+ *
+ * @param offset The offset in bytes to start this new ArrowBuf.
+ * @param length The length in bytes that this ArrowBuf will provide access to.
+ * @return A new ArrowBuf that shares references with all ArrowBufs associated with this
+ * BufferLedger
*/
public ArrowBuf newArrowBuf(int offset, int length) {
allocator.assertOpen();
@@ -354,13 +381,13 @@ public class AllocationManager {
/**
* Create a new ArrowBuf associated with this AllocationManager and memory.
- * @param offset
- * The offset in bytes to start this new ArrowBuf.
- * @param length
- * The length in bytes that this ArrowBuf will provide access to.
- * @param manager
- * An optional BufferManager argument that can be used to manage expansion of this ArrowBuf
- * @return A new ArrowBuf that shares references with all ArrowBufs associated with this BufferLedger
+ *
+ * @param offset The offset in bytes to start this new ArrowBuf.
+ * @param length The length in bytes that this ArrowBuf will provide access to.
+ * @param manager An optional BufferManager argument that can be used to manage expansion of
+ * this ArrowBuf
+ * @return A new ArrowBuf that shares references with all ArrowBufs associated with this
+ * BufferLedger
*/
public ArrowBuf newArrowBuf(int offset, int length, BufferManager manager) {
allocator.assertOpen();
@@ -377,7 +404,8 @@ public class AllocationManager {
if (BaseAllocator.DEBUG) {
historicalLog.recordEvent(
- "ArrowBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
+ "ArrowBuf(BufferLedger, BufferAllocator[%s], " +
+ "UnsafeDirectLittleEndian[identityHashCode == "
+ "%d](%s)) => ledger hc == %d",
allocator.name, System.identityHashCode(buf), buf.toString(),
System.identityHashCode(this));
@@ -401,7 +429,8 @@ public class AllocationManager {
}
/**
- * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+ * How much memory is accounted for by this ledger. This is either getSize() if this is the
+ * owning ledger for the
* memory or zero in the case that this is not the owning ledger associated with this memory.
*
* @return Amount of accounted(owned) memory associated with this ledger.
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
index 68d1244..7f5aa31 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
@@ -6,32 +6,36 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
import io.netty.buffer.ArrowBuf;
/**
- * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they
- * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception
+ * Supports cumulative allocation reservation. Clients may increase the size of the reservation
+ * repeatedly until they
+ * call for an allocation of the current total size. The reservation can only be used once, and
+ * will throw an exception
* if it is used more than once.
* <p>
- * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not.
+ * For the purposes of airtight memory accounting, the reservation must be close()d whether it is
+ * used or not.
* This is not threadsafe.
*/
public interface AllocationReservation extends AutoCloseable {
/**
* Add to the current reservation.
- *
+ * <p>
* <p>Adding may fail if the allocator is not allowed to consume any more space.
*
* @param nBytes the number of bytes to add
@@ -42,7 +46,7 @@ public interface AllocationReservation extends AutoCloseable {
/**
* Requests a reservation of additional space.
- *
+ * <p>
* <p>The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
@@ -52,7 +56,7 @@ public interface AllocationReservation extends AutoCloseable {
/**
* Allocate a buffer whose size is the total of all the add()s made.
- *
+ * <p>
* <p>The allocation request can still fail, even if the amount of space
* requested is available, if the allocation cannot be made contiguously.
*
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
index 3274642..d5b638e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
/**
@@ -23,6 +24,7 @@ package org.apache.arrow.memory;
*/
@SuppressWarnings("serial")
public class AllocatorClosedException extends RuntimeException {
+
/**
* @param message string associated with the cause
*/
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9a57013/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
index 5dc5ac3..b8b5283 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.arrow.memory;
import io.netty.buffer.ByteBuf;
@@ -23,9 +24,12 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ExpandableByteBuf;
/**
- * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC layer to be accounted
- * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
- * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
+ * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC
+ * layer to be accounted
+ * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a
+ * typical BufferAllocator is
+ * the signature and the fact that this Allocator returns ExpandableByteBufs which enable
+ * otherwise non-expandable
* ArrowBufs to be expandable.
*/
public class ArrowByteBufAllocator implements ByteBufAllocator {