You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2024/01/17 15:41:52 UTC

(camel-quarkus) 03/06: Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 7300cc0d98df87d639c6706ac8e40dcbbc896973
Author: James Netherton <ja...@gmail.com>
AuthorDate: Mon Jan 8 14:37:02 2024 +0000

    Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641
---
 .../src/main/java/io/netty/buffer/LargeBuffer.java |  34 ++
 .../io/netty/buffer/MutableWrappedByteBuf.java     | 447 +++++++++++++++++++++
 .../io/netty/buffer/PooledByteBufAllocatorL.java   | 275 +++++++++++++
 .../io/netty/buffer/UnsafeDirectLittleEndian.java  | 261 ++++++++++++
 4 files changed, 1017 insertions(+)

diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000000..306ce49433
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+/**
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
+ * counts.
+ */
+public class LargeBuffer extends MutableWrappedByteBuf {
+
+    public LargeBuffer(ByteBuf buffer) {
+        super(buffer);
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        return new LargeBuffer(buffer.copy(index, length));
+    }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000000..005a049f86
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.ByteProcessor;
+
+/**
+ * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override
+ * some behaviors and make buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+    ByteBuf buffer;
+
+    public MutableWrappedByteBuf(ByteBuf buffer) {
+        super(buffer.maxCapacity());
+
+        if (buffer instanceof MutableWrappedByteBuf) {
+            this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+        } else {
+            this.buffer = buffer;
+        }
+
+        setIndex(buffer.readerIndex(), buffer.writerIndex());
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(int index, int length) {
+        return unwrap().nioBuffer(index, length);
+    }
+
+    @Override
+    public ByteBuf unwrap() {
+        return buffer;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return buffer.alloc();
+    }
+
+    @Override
+    public ByteOrder order() {
+        return buffer.order();
+    }
+
+    @Override
+    public boolean isDirect() {
+        return buffer.isDirect();
+    }
+
+    @Override
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public ByteBuf capacity(int newCapacity) {
+        buffer.capacity(newCapacity);
+        return this;
+    }
+
+    @Override
+    public boolean hasArray() {
+        return buffer.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        return buffer.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buffer.arrayOffset();
+    }
+
+    @Override
+    public boolean hasMemoryAddress() {
+        return buffer.hasMemoryAddress();
+    }
+
+    @Override
+    public long memoryAddress() {
+        return buffer.memoryAddress();
+    }
+
+    @Override
+    public byte getByte(int index) {
+        return _getByte(index);
+    }
+
+    @Override
+    protected byte _getByte(int index) {
+        return buffer.getByte(index);
+    }
+
+    @Override
+    public short getShort(int index) {
+        return _getShort(index);
+    }
+
+    @Override
+    protected short _getShort(int index) {
+        return buffer.getShort(index);
+    }
+
+    @Override
+    public short getShortLE(int index) {
+        return buffer.getShortLE(index);
+    }
+
+    @Override
+    protected short _getShortLE(int index) {
+        return buffer.getShortLE(index);
+    }
+
+    @Override
+    public int getUnsignedMedium(int index) {
+        return _getUnsignedMedium(index);
+    }
+
+    @Override
+    protected int _getUnsignedMedium(int index) {
+        return buffer.getUnsignedMedium(index);
+    }
+
+    @Override
+    public int getUnsignedMediumLE(int index) {
+        return buffer.getUnsignedMediumLE(index);
+    }
+
+    @Override
+    protected int _getUnsignedMediumLE(int index) {
+        return buffer.getUnsignedMediumLE(index);
+    }
+
+    @Override
+    public int getInt(int index) {
+        return _getInt(index);
+    }
+
+    @Override
+    protected int _getInt(int index) {
+        return buffer.getInt(index);
+    }
+
+    @Override
+    public int getIntLE(int index) {
+        return buffer.getIntLE(index);
+    }
+
+    @Override
+    protected int _getIntLE(int index) {
+        return buffer.getIntLE(index);
+    }
+
+    @Override
+    public long getLong(int index) {
+        return _getLong(index);
+    }
+
+    @Override
+    protected long _getLong(int index) {
+        return buffer.getLong(index);
+    }
+
+    @Override
+    public long getLongLE(int index) {
+        return buffer.getLongLE(index);
+    }
+
+    @Override
+    protected long _getLongLE(int index) {
+        return buffer.getLongLE(index);
+    }
+
+    @Override
+    public abstract ByteBuf copy(int index, int length);
+
+    @Override
+    public ByteBuf slice(int index, int length) {
+        return new SlicedByteBuf(this, index, length);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+        buffer.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+        buffer.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        buffer.getBytes(index, dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+        _setByte(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setByte(int index, int value) {
+        buffer.setByte(index, value);
+    }
+
+    @Override
+    public ByteBuf setShort(int index, int value) {
+        _setShort(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setShort(int index, int value) {
+        buffer.setShort(index, value);
+    }
+
+    @Override
+    public ByteBuf setShortLE(int index, int value) {
+        buffer.setShortLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setShortLE(int index, int value) {
+        buffer.setShortLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setMedium(int index, int value) {
+        _setMedium(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setMedium(int index, int value) {
+        buffer.setMedium(index, value);
+    }
+
+    @Override
+    public ByteBuf setMediumLE(int index, int value) {
+        buffer.setMediumLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setMediumLE(int index, int value) {
+        buffer.setMediumLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setInt(int index, int value) {
+        _setInt(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setInt(int index, int value) {
+        buffer.setInt(index, value);
+    }
+
+    @Override
+    public ByteBuf setIntLE(int index, int value) {
+        buffer.setIntLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setIntLE(int index, int value) {
+        buffer.setIntLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setLong(int index, long value) {
+        _setLong(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setLong(int index, long value) {
+        buffer.setLong(index, value);
+    }
+
+    @Override
+    public ByteBuf setLongLE(int index, long value) {
+        buffer.setLongLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setLongLE(int index, long value) {
+        buffer.setLongLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+        buffer.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+        buffer.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuffer src) {
+        buffer.setBytes(index, src);
+        return this;
+    }
+
+    @Override
+    public int setBytes(int index, FileChannel in, long position, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, position, length);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length)
+            throws IOException {
+        buffer.getBytes(index, out, length);
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length)
+            throws IOException {
+        return buffer.getBytes(index, out, length);
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, length);
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, length);
+    }
+
+    @Override
+    public int getBytes(int index, FileChannel out, long position, int length)
+            throws IOException {
+        return buffer.getBytes(index, out, position, length);
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return buffer.nioBufferCount();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return buffer.nioBuffers(index, length);
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        return nioBuffer(index, length);
+    }
+
+    @Override
+    public int forEachByte(int index, int length, ByteProcessor processor) {
+        return buffer.forEachByte(index, length, processor);
+    }
+
+    @Override
+    public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+        return buffer.forEachByteDesc(index, length, processor);
+    }
+
+    @Override
+    public final int refCnt() {
+        return unwrap().refCnt();
+    }
+
+    @Override
+    public final ByteBuf touch() {
+        unwrap().touch();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf touch(Object hint) {
+        unwrap().touch(hint);
+        return this;
+    }
+
+    @Override
+    public final ByteBuf retain() {
+        unwrap().retain();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf retain(int increment) {
+        unwrap().retain(increment);
+        return this;
+    }
+
+    @Override
+    public boolean release() {
+        return release(1);
+    }
+
+    @Override
+    public boolean release(int decrement) {
+        boolean released = unwrap().release(decrement);
+        return released;
+    }
+
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 0000000000..1202f003db
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.OutOfDirectMemoryError;
+import io.netty.util.internal.StringUtil;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+
+/**
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
+
+    private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
+
+    private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+    public final UnsafeDirectLittleEndian empty;
+    private final AtomicLong hugeBufferSize = new AtomicLong(0);
+    private final AtomicLong hugeBufferCount = new AtomicLong(0);
+    private final AtomicLong normalBufferSize = new AtomicLong(0);
+    private final AtomicLong normalBufferCount = new AtomicLong(0);
+    private final InnerAllocator allocator;
+
+    public PooledByteBufAllocatorL() {
+        allocator = new InnerAllocator();
+        empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+    }
+
+    /**
+     * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
+     */
+    public UnsafeDirectLittleEndian allocate(long size) {
+        try {
+            return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE);
+        } catch (OutOfMemoryError e) {
+            /*
+             * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
+             * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
+             * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
+             * as Netty is expected to throw an OutOfDirectMemoryError first.
+             */
+            if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) {
+                throw new OutOfMemoryException("Failure allocating buffer.", e);
+            }
+            throw e;
+        }
+    }
+
+    public int getChunkSize() {
+        return allocator.chunkSize();
+    }
+
+    public long getHugeBufferSize() {
+        return hugeBufferSize.get();
+    }
+
+    public long getHugeBufferCount() {
+        return hugeBufferCount.get();
+    }
+
+    public long getNormalBufferSize() {
+        return normalBufferSize.get();
+    }
+
+    public long getNormalBufferCount() {
+        return normalBufferSize.get();
+    }
+
+    private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
+        private final long initialCapacity;
+        private final AtomicLong count;
+        private final AtomicLong size;
+
+        private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+            super(buf);
+            this.initialCapacity = buf.capacity();
+            this.count = count;
+            this.size = size;
+        }
+
+        private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+                AtomicLong size) {
+            super(buf);
+            this.initialCapacity = buf.capacity();
+            this.count = count;
+            this.size = size;
+        }
+
+        @Override
+        public ByteBuf copy() {
+            throw new UnsupportedOperationException("copy method is not supported");
+        }
+
+        @Override
+        public ByteBuf copy(int index, int length) {
+            throw new UnsupportedOperationException("copy method is not supported");
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            boolean released = super.release(decrement);
+            if (released) {
+                count.decrementAndGet();
+                size.addAndGet(-initialCapacity);
+            }
+            return released;
+        }
+
+    }
+
+    private class InnerAllocator extends PooledByteBufAllocator {
+
+        private final PoolArena<ByteBuffer>[] directArenas;
+        private final MemoryStatusThread statusThread;
+
+        public InnerAllocator() {
+            super(true);
+
+            try {
+                Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+                f.setAccessible(true);
+                this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+            } catch (Exception e) {
+                throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+            }
+
+            if (memoryLogger.isTraceEnabled()) {
+                statusThread = new MemoryStatusThread(this);
+                statusThread.start();
+            } else {
+                statusThread = null;
+            }
+        }
+
+        private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+            PoolThreadCache cache = threadCache();
+            PoolArena<ByteBuffer> directArena = cache.directArena;
+
+            if (directArena != null) {
+
+                if (initialCapacity > chunkSize()) {
+                    // This is beyond chunk size so we'll allocate separately.
+                    ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+                    hugeBufferSize.addAndGet(buf.capacity());
+                    hugeBufferCount.incrementAndGet();
+
+                    // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+                    return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
+                            hugeBufferSize);
+                } else {
+                    // within chunk, use arena.
+                    ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+                    if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+                        fail();
+                    }
+
+                    if (!ASSERT_ENABLED) {
+                        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
+                    }
+
+                    normalBufferSize.addAndGet(buf.capacity());
+                    normalBufferCount.incrementAndGet();
+
+                    return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
+                            normalBufferCount, normalBufferSize);
+                }
+
+            } else {
+                throw fail();
+            }
+        }
+
+        private UnsupportedOperationException fail() {
+            return new UnsupportedOperationException(
+                    "Arrow requires that the JVM used supports access sun.misc.Unsafe.  This platform " +
+                            "didn't provide that functionality.");
+        }
+
+        @Override
+        public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+            if (initialCapacity == 0 && maxCapacity == 0) {
+                newDirectBuffer(initialCapacity, maxCapacity);
+            }
+            validate(initialCapacity, maxCapacity);
+            return newDirectBufferL(initialCapacity, maxCapacity);
+        }
+
+        @Override
+        public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+            throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
+        }
+
+        private void validate(int initialCapacity, int maxCapacity) {
+            if (initialCapacity < 0) {
+                throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)");
+            }
+            if (initialCapacity > maxCapacity) {
+                throw new IllegalArgumentException(String.format(
+                        "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+                        initialCapacity, maxCapacity));
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            buf.append(directArenas.length);
+            buf.append(" direct arena(s):");
+            buf.append(StringUtil.NEWLINE);
+            for (PoolArena<ByteBuffer> a : directArenas) {
+                buf.append(a);
+            }
+
+            buf.append("Large buffers outstanding: ");
+            buf.append(hugeBufferCount.get());
+            buf.append(" totaling ");
+            buf.append(hugeBufferSize.get());
+            buf.append(" bytes.");
+            buf.append('\n');
+            buf.append("Normal buffers outstanding: ");
+            buf.append(normalBufferCount.get());
+            buf.append(" totaling ");
+            buf.append(normalBufferSize.get());
+            buf.append(" bytes.");
+            return buf.toString();
+        }
+
+        private class MemoryStatusThread extends Thread {
+            private final InnerAllocator allocator;
+
+            public MemoryStatusThread(InnerAllocator allocator) {
+                super("allocation.logger");
+                this.setDaemon(true);
+                this.allocator = allocator;
+            }
+
+            @Override
+            public void run() {
+                while (true) {
+                    memoryLogger.trace("Memory Usage: \n{}", allocator);
+                    try {
+                        Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                }
+            }
+        }
+
+    }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
new file mode 100644
index 0000000000..cad271c49b
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+    public final long id = ID_GENERATOR.incrementAndGet();
+    private final AbstractByteBuf wrapped;
+    private final long memoryAddress;
+
+    UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+        this(buf, true);
+    }
+
+    UnsafeDirectLittleEndian(LargeBuffer buf) {
+        this(buf, true);
+    }
+
+    UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+        this(buf, true);
+    }
+
+    private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+        super(buf);
+
+        this.wrapped = buf;
+        this.memoryAddress = buf.memoryAddress();
+    }
+
+    private long addr(int index) {
+        return memoryAddress + index;
+    }
+
+    @Override
+    public long getLong(int index) {
+        // wrapped.checkIndex(index, 8);
+        long v = PlatformDependent.getLong(addr(index));
+        return v;
+    }
+
+    @Override
+    public float getFloat(int index) {
+        return Float.intBitsToFloat(getInt(index));
+    }
+
+    @Override
+    public ByteBuf slice() {
+        return slice(this.readerIndex(), readableBytes());
+    }
+
+    @Override
+    public ByteBuf slice(int index, int length) {
+        return new SlicedByteBuf(this, index, length);
+    }
+
+    @Override
+    public ByteBuf order(ByteOrder endianness) {
+        return this;
+    }
+
+    @Override
+    public double getDouble(int index) {
+        return Double.longBitsToDouble(getLong(index));
+    }
+
+    @Override
+    public char getChar(int index) {
+        return (char) getShort(index);
+    }
+
+    @Override
+    public long getUnsignedInt(int index) {
+        return getInt(index) & 0xFFFFFFFFL;
+    }
+
+    @Override
+    public int getInt(int index) {
+        int v = PlatformDependent.getInt(addr(index));
+        return v;
+    }
+
+    @Override
+    public int getUnsignedShort(int index) {
+        return getShort(index) & 0xFFFF;
+    }
+
+    @Override
+    public short getShort(int index) {
+        short v = PlatformDependent.getShort(addr(index));
+        return v;
+    }
+
+    @Override
+    public ByteBuf setShort(int index, int value) {
+        wrapped.checkIndex(index, 2);
+        setShort_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setInt(int index, int value) {
+        wrapped.checkIndex(index, 4);
+        setInt_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setLong(int index, long value) {
+        wrapped.checkIndex(index, 8);
+        setLong_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setChar(int index, int value) {
+        setShort(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setFloat(int index, float value) {
+        setInt(index, Float.floatToRawIntBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf setDouble(int index, double value) {
+        setLong(index, Double.doubleToRawLongBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeShort(int value) {
+        wrapped.ensureWritable(2);
+        setShort_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 2;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeInt(int value) {
+        wrapped.ensureWritable(4);
+        setInt_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 4;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeLong(long value) {
+        wrapped.ensureWritable(8);
+        setLong_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 8;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeChar(int value) {
+        writeShort(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeFloat(float value) {
+        writeInt(Float.floatToRawIntBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeDouble(double value) {
+        writeLong(Double.doubleToRawLongBits(value));
+        return this;
+    }
+
+    private void setShort_(int index, int value) {
+        PlatformDependent.putShort(addr(index), (short) value);
+    }
+
+    private void setInt_(int index, int value) {
+        PlatformDependent.putInt(addr(index), value);
+    }
+
+    private void setLong_(int index, long value) {
+        PlatformDependent.putLong(addr(index), value);
+    }
+
+    @Override
+    public byte getByte(int index) {
+        return PlatformDependent.getByte(addr(index));
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+        PlatformDependent.putByte(addr(index), (byte) value);
+        return this;
+    }
+
+    @Override
+    public boolean release() {
+        return release(1);
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws IOException {
+        wrapped.checkIndex(index, length);
+        byte[] tmp = new byte[length];
+        int readBytes = in.read(tmp);
+        if (readBytes > 0) {
+            PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+        }
+        return readBytes;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+        wrapped.checkIndex(index, length);
+        if (length != 0) {
+            byte[] tmp = new byte[length];
+            PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+            out.write(tmp);
+        }
+        return this;
+    }
+
+    @Override
+    public int hashCode() {
+        return System.identityHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return this == obj;
+    }
+}