You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2024/01/17 15:41:52 UTC
(camel-quarkus) 03/06: Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 7300cc0d98df87d639c6706ac8e40dcbbc896973
Author: James Netherton <ja...@gmail.com>
AuthorDate: Mon Jan 8 14:37:02 2024 +0000
Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641
---
.../src/main/java/io/netty/buffer/LargeBuffer.java | 34 ++
.../io/netty/buffer/MutableWrappedByteBuf.java | 447 +++++++++++++++++++++
.../io/netty/buffer/PooledByteBufAllocatorL.java | 275 +++++++++++++
.../io/netty/buffer/UnsafeDirectLittleEndian.java | 261 ++++++++++++
4 files changed, 1017 insertions(+)
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000000..306ce49433
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+/**
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
+ * counts.
+ */
+public class LargeBuffer extends MutableWrappedByteBuf {
+
+ public LargeBuffer(ByteBuf buffer) {
+ super(buffer);
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ return new LargeBuffer(buffer.copy(index, length));
+ }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000000..005a049f86
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.ByteProcessor;
+
+/**
+ * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override
+ * some behaviors and make buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+ ByteBuf buffer;
+
+ public MutableWrappedByteBuf(ByteBuf buffer) {
+ super(buffer.maxCapacity());
+
+ if (buffer instanceof MutableWrappedByteBuf) {
+ this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+ } else {
+ this.buffer = buffer;
+ }
+
+ setIndex(buffer.readerIndex(), buffer.writerIndex());
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return unwrap().nioBuffer(index, length);
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return buffer;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return buffer.alloc();
+ }
+
+ @Override
+ public ByteOrder order() {
+ return buffer.order();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ buffer.capacity(newCapacity);
+ return this;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return buffer.hasMemoryAddress();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return buffer.memoryAddress();
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return _getByte(index);
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return _getShort(index);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ @Override
+ public short getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
+ protected short _getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ return _getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return buffer.getUnsignedMedium(index);
+ }
+
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return _getInt(index);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ @Override
+ public int getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
+ protected int _getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return _getLong(index);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return buffer.getLong(index);
+ }
+
+ @Override
+ public long getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
+ protected long _getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
+ public abstract ByteBuf copy(int index, int length);
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ buffer.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ _setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ buffer.setByte(index, value);
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ _setShort(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ buffer.setShort(index, value);
+ }
+
+ @Override
+ public ByteBuf setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ _setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ buffer.setMedium(index, value);
+ }
+
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ _setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ buffer.setInt(index, value);
+ }
+
+ @Override
+ public ByteBuf setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ _setLong(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ buffer.setLong(index, value);
+ }
+
+ @Override
+ public ByteBuf setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ buffer.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public int setBytes(int index, FileChannel in, long position, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, position, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length)
+ throws IOException {
+ buffer.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, length);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int getBytes(int index, FileChannel out, long position, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, position, length);
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return buffer.nioBufferCount();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return buffer.nioBuffers(index, length);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ return nioBuffer(index, length);
+ }
+
+ @Override
+ public int forEachByte(int index, int length, ByteProcessor processor) {
+ return buffer.forEachByte(index, length, processor);
+ }
+
+ @Override
+ public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+ return buffer.forEachByteDesc(index, length, processor);
+ }
+
+ @Override
+ public final int refCnt() {
+ return unwrap().refCnt();
+ }
+
+ @Override
+ public final ByteBuf touch() {
+ unwrap().touch();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf touch(Object hint) {
+ unwrap().touch(hint);
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain() {
+ unwrap().retain();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain(int increment) {
+ unwrap().retain(increment);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = unwrap().release(decrement);
+ return released;
+ }
+
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 0000000000..1202f003db
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.OutOfDirectMemoryError;
+import io.netty.util.internal.StringUtil;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+
+/**
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
+
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
+
+ private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ public final UnsafeDirectLittleEndian empty;
+ private final AtomicLong hugeBufferSize = new AtomicLong(0);
+ private final AtomicLong hugeBufferCount = new AtomicLong(0);
+ private final AtomicLong normalBufferSize = new AtomicLong(0);
+ private final AtomicLong normalBufferCount = new AtomicLong(0);
+ private final InnerAllocator allocator;
+
+ public PooledByteBufAllocatorL() {
+ allocator = new InnerAllocator();
+ empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+ }
+
+ /**
+ * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
+ */
+ public UnsafeDirectLittleEndian allocate(long size) {
+ try {
+ return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE);
+ } catch (OutOfMemoryError e) {
+ /*
+ * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
+ * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
+ * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
+ * as Netty is expected to throw an OutOfDirectMemoryError first.
+ */
+ if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) {
+ throw new OutOfMemoryException("Failure allocating buffer.", e);
+ }
+ throw e;
+ }
+ }
+
+ public int getChunkSize() {
+ return allocator.chunkSize();
+ }
+
+ public long getHugeBufferSize() {
+ return hugeBufferSize.get();
+ }
+
+ public long getHugeBufferCount() {
+ return hugeBufferCount.get();
+ }
+
+ public long getNormalBufferSize() {
+ return normalBufferSize.get();
+ }
+
+ public long getNormalBufferCount() {
+ return normalBufferSize.get();
+ }
+
+ private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
+ private final long initialCapacity;
+ private final AtomicLong count;
+ private final AtomicLong size;
+
+ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+ AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ @Override
+ public ByteBuf copy() {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ if (released) {
+ count.decrementAndGet();
+ size.addAndGet(-initialCapacity);
+ }
+ return released;
+ }
+
+ }
+
+ private class InnerAllocator extends PooledByteBufAllocator {
+
+ private final PoolArena<ByteBuffer>[] directArenas;
+ private final MemoryStatusThread statusThread;
+
+ public InnerAllocator() {
+ super(true);
+
+ try {
+ Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+ f.setAccessible(true);
+ this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ }
+
+ if (memoryLogger.isTraceEnabled()) {
+ statusThread = new MemoryStatusThread(this);
+ statusThread.start();
+ } else {
+ statusThread = null;
+ }
+ }
+
+ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+ PoolThreadCache cache = threadCache();
+ PoolArena<ByteBuffer> directArena = cache.directArena;
+
+ if (directArena != null) {
+
+ if (initialCapacity > chunkSize()) {
+ // This is beyond chunk size so we'll allocate separately.
+ ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+ hugeBufferSize.addAndGet(buf.capacity());
+ hugeBufferCount.incrementAndGet();
+
+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+ return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
+ hugeBufferSize);
+ } else {
+ // within chunk, use arena.
+ ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+ if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+ fail();
+ }
+
+ if (!ASSERT_ENABLED) {
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
+ }
+
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+
+ return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
+ normalBufferCount, normalBufferSize);
+ }
+
+ } else {
+ throw fail();
+ }
+ }
+
+ private UnsupportedOperationException fail() {
+ return new UnsupportedOperationException(
+ "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
+ "didn't provide that functionality.");
+ }
+
+ @Override
+ public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+ if (initialCapacity == 0 && maxCapacity == 0) {
+ newDirectBuffer(initialCapacity, maxCapacity);
+ }
+ validate(initialCapacity, maxCapacity);
+ return newDirectBufferL(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
+ }
+
+ private void validate(int initialCapacity, int maxCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)");
+ }
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+ initialCapacity, maxCapacity));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(directArenas.length);
+ buf.append(" direct arena(s):");
+ buf.append(StringUtil.NEWLINE);
+ for (PoolArena<ByteBuffer> a : directArenas) {
+ buf.append(a);
+ }
+
+ buf.append("Large buffers outstanding: ");
+ buf.append(hugeBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(hugeBufferSize.get());
+ buf.append(" bytes.");
+ buf.append('\n');
+ buf.append("Normal buffers outstanding: ");
+ buf.append(normalBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(normalBufferSize.get());
+ buf.append(" bytes.");
+ return buf.toString();
+ }
+
+ private class MemoryStatusThread extends Thread {
+ private final InnerAllocator allocator;
+
+ public MemoryStatusThread(InnerAllocator allocator) {
+ super("allocation.logger");
+ this.setDaemon(true);
+ this.allocator = allocator;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}", allocator);
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
new file mode 100644
index 0000000000..cad271c49b
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ public final long id = ID_GENERATOR.incrementAndGet();
+ private final AbstractByteBuf wrapped;
+ private final long memoryAddress;
+
+ UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+ this(buf, true);
+ }
+
+ UnsafeDirectLittleEndian(LargeBuffer buf) {
+ this(buf, true);
+ }
+
+ UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+ this(buf, true);
+ }
+
+ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+ super(buf);
+
+ this.wrapped = buf;
+ this.memoryAddress = buf.memoryAddress();
+ }
+
+ private long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ public long getLong(int index) {
+ // wrapped.checkIndex(index, 8);
+ long v = PlatformDependent.getLong(addr(index));
+ return v;
+ }
+
+ @Override
+ public float getFloat(int index) {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ @Override
+ public ByteBuf slice() {
+ return slice(this.readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf order(ByteOrder endianness) {
+ return this;
+ }
+
+ @Override
+ public double getDouble(int index) {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ @Override
+ public char getChar(int index) {
+ return (char) getShort(index);
+ }
+
+ @Override
+ public long getUnsignedInt(int index) {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ @Override
+ public int getInt(int index) {
+ int v = PlatformDependent.getInt(addr(index));
+ return v;
+ }
+
+ @Override
+ public int getUnsignedShort(int index) {
+ return getShort(index) & 0xFFFF;
+ }
+
+ @Override
+ public short getShort(int index) {
+ short v = PlatformDependent.getShort(addr(index));
+ return v;
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ wrapped.checkIndex(index, 2);
+ setShort_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ wrapped.checkIndex(index, 4);
+ setInt_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ wrapped.checkIndex(index, 8);
+ setLong_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setChar(int index, int value) {
+ setShort(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setFloat(int index, float value) {
+ setInt(index, Float.floatToRawIntBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf setDouble(int index, double value) {
+ setLong(index, Double.doubleToRawLongBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeShort(int value) {
+ wrapped.ensureWritable(2);
+ setShort_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 2;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeInt(int value) {
+ wrapped.ensureWritable(4);
+ setInt_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 4;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeLong(long value) {
+ wrapped.ensureWritable(8);
+ setLong_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 8;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeChar(int value) {
+ writeShort(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeFloat(float value) {
+ writeInt(Float.floatToRawIntBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeDouble(double value) {
+ writeLong(Double.doubleToRawLongBits(value));
+ return this;
+ }
+
+ private void setShort_(int index, int value) {
+ PlatformDependent.putShort(addr(index), (short) value);
+ }
+
+ private void setInt_(int index, int value) {
+ PlatformDependent.putInt(addr(index), value);
+ }
+
+ private void setLong_(int index, long value) {
+ PlatformDependent.putLong(addr(index), value);
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return PlatformDependent.getByte(addr(index));
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ PlatformDependent.putByte(addr(index), (byte) value);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ wrapped.checkIndex(index, length);
+ byte[] tmp = new byte[length];
+ int readBytes = in.read(tmp);
+ if (readBytes > 0) {
+ PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+ }
+ return readBytes;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ wrapped.checkIndex(index, length);
+ if (length != 0) {
+ byte[] tmp = new byte[length];
+ PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+ out.write(tmp);
+ }
+ return this;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+}