You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2024/01/08 15:37:24 UTC

(camel-quarkus) branch quarkus-main updated (320040f582 -> 5f088c4e47)

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a change to branch quarkus-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


 discard 320040f582 Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641
 discard 525ac4b747 Disable OpenTelemetryDisabledTest due to quarkusio/quarkus#38084
     new 5f088c4e47 Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (320040f582)
            \
             N -- N -- N   refs/heads/quarkus-main (5f088c4e47)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/opentelemetry/deployment/OpenTelemetryDisabledTest.java   | 2 --
 1 file changed, 2 deletions(-)


(camel-quarkus) 01/01: Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch quarkus-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 5f088c4e471b2e5c3eaff178cc0214b30acb8bca
Author: James Netherton <ja...@gmail.com>
AuthorDate: Mon Jan 8 14:37:02 2024 +0000

    Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641
---
 .../src/main/java/io/netty/buffer/LargeBuffer.java |  34 ++
 .../io/netty/buffer/MutableWrappedByteBuf.java     | 447 +++++++++++++++++++++
 .../io/netty/buffer/PooledByteBufAllocatorL.java   | 275 +++++++++++++
 .../io/netty/buffer/UnsafeDirectLittleEndian.java  | 261 ++++++++++++
 4 files changed, 1017 insertions(+)

diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000000..1d81b725b4
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+/**
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
+ * counts.
+ */
+public class LargeBuffer extends MutableWrappedByteBuf {
+
+    public LargeBuffer(ByteBuf buffer) {
+        super(buffer);
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        return new LargeBuffer(buffer.copy(index, length));
+    }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000000..2bee1bc509
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.ByteProcessor;
+
+/**
+ * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override
+ * some behaviors and make buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+    ByteBuf buffer;
+
+    public MutableWrappedByteBuf(ByteBuf buffer) {
+        super(buffer.maxCapacity());
+
+        if (buffer instanceof MutableWrappedByteBuf) {
+            this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+        } else {
+            this.buffer = buffer;
+        }
+
+        setIndex(buffer.readerIndex(), buffer.writerIndex());
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(int index, int length) {
+        return unwrap().nioBuffer(index, length);
+    }
+
+    @Override
+    public ByteBuf unwrap() {
+        return buffer;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return buffer.alloc();
+    }
+
+    @Override
+    public ByteOrder order() {
+        return buffer.order();
+    }
+
+    @Override
+    public boolean isDirect() {
+        return buffer.isDirect();
+    }
+
+    @Override
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public ByteBuf capacity(int newCapacity) {
+        buffer.capacity(newCapacity);
+        return this;
+    }
+
+    @Override
+    public boolean hasArray() {
+        return buffer.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        return buffer.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buffer.arrayOffset();
+    }
+
+    @Override
+    public boolean hasMemoryAddress() {
+        return buffer.hasMemoryAddress();
+    }
+
+    @Override
+    public long memoryAddress() {
+        return buffer.memoryAddress();
+    }
+
+    @Override
+    public byte getByte(int index) {
+        return _getByte(index);
+    }
+
+    @Override
+    protected byte _getByte(int index) {
+        return buffer.getByte(index);
+    }
+
+    @Override
+    public short getShort(int index) {
+        return _getShort(index);
+    }
+
+    @Override
+    protected short _getShort(int index) {
+        return buffer.getShort(index);
+    }
+
+    @Override
+    public short getShortLE(int index) {
+        return buffer.getShortLE(index);
+    }
+
+    @Override
+    protected short _getShortLE(int index) {
+        return buffer.getShortLE(index);
+    }
+
+    @Override
+    public int getUnsignedMedium(int index) {
+        return _getUnsignedMedium(index);
+    }
+
+    @Override
+    protected int _getUnsignedMedium(int index) {
+        return buffer.getUnsignedMedium(index);
+    }
+
+    @Override
+    public int getUnsignedMediumLE(int index) {
+        return buffer.getUnsignedMediumLE(index);
+    }
+
+    @Override
+    protected int _getUnsignedMediumLE(int index) {
+        return buffer.getUnsignedMediumLE(index);
+    }
+
+    @Override
+    public int getInt(int index) {
+        return _getInt(index);
+    }
+
+    @Override
+    protected int _getInt(int index) {
+        return buffer.getInt(index);
+    }
+
+    @Override
+    public int getIntLE(int index) {
+        return buffer.getIntLE(index);
+    }
+
+    @Override
+    protected int _getIntLE(int index) {
+        return buffer.getIntLE(index);
+    }
+
+    @Override
+    public long getLong(int index) {
+        return _getLong(index);
+    }
+
+    @Override
+    protected long _getLong(int index) {
+        return buffer.getLong(index);
+    }
+
+    @Override
+    public long getLongLE(int index) {
+        return buffer.getLongLE(index);
+    }
+
+    @Override
+    protected long _getLongLE(int index) {
+        return buffer.getLongLE(index);
+    }
+
+    @Override
+    public abstract ByteBuf copy(int index, int length);
+
+    @Override
+    public ByteBuf slice(int index, int length) {
+        return new SlicedByteBuf(this, index, length);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+        buffer.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+        buffer.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        buffer.getBytes(index, dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+        _setByte(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setByte(int index, int value) {
+        buffer.setByte(index, value);
+    }
+
+    @Override
+    public ByteBuf setShort(int index, int value) {
+        _setShort(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setShort(int index, int value) {
+        buffer.setShort(index, value);
+    }
+
+    @Override
+    public ByteBuf setShortLE(int index, int value) {
+        buffer.setShortLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setShortLE(int index, int value) {
+        buffer.setShortLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setMedium(int index, int value) {
+        _setMedium(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setMedium(int index, int value) {
+        buffer.setMedium(index, value);
+    }
+
+    @Override
+    public ByteBuf setMediumLE(int index, int value) {
+        buffer.setMediumLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setMediumLE(int index, int value) {
+        buffer.setMediumLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setInt(int index, int value) {
+        _setInt(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setInt(int index, int value) {
+        buffer.setInt(index, value);
+    }
+
+    @Override
+    public ByteBuf setIntLE(int index, int value) {
+        buffer.setIntLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setIntLE(int index, int value) {
+        buffer.setIntLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setLong(int index, long value) {
+        _setLong(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setLong(int index, long value) {
+        buffer.setLong(index, value);
+    }
+
+    @Override
+    public ByteBuf setLongLE(int index, long value) {
+        buffer.setLongLE(index, value);
+        return this;
+    }
+
+    @Override
+    protected void _setLongLE(int index, long value) {
+        buffer.setLongLE(index, value);
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+        buffer.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+        buffer.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuffer src) {
+        buffer.setBytes(index, src);
+        return this;
+    }
+
+    @Override
+    public int setBytes(int index, FileChannel in, long position, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, position, length);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length)
+            throws IOException {
+        buffer.getBytes(index, out, length);
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length)
+            throws IOException {
+        return buffer.getBytes(index, out, length);
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, length);
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length)
+            throws IOException {
+        return buffer.setBytes(index, in, length);
+    }
+
+    @Override
+    public int getBytes(int index, FileChannel out, long position, int length)
+            throws IOException {
+        return buffer.getBytes(index, out, position, length);
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return buffer.nioBufferCount();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return buffer.nioBuffers(index, length);
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        return nioBuffer(index, length);
+    }
+
+    @Override
+    public int forEachByte(int index, int length, ByteProcessor processor) {
+        return buffer.forEachByte(index, length, processor);
+    }
+
+    @Override
+    public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+        return buffer.forEachByteDesc(index, length, processor);
+    }
+
+    @Override
+    public final int refCnt() {
+        return unwrap().refCnt();
+    }
+
+    @Override
+    public final ByteBuf touch() {
+        unwrap().touch();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf touch(Object hint) {
+        unwrap().touch(hint);
+        return this;
+    }
+
+    @Override
+    public final ByteBuf retain() {
+        unwrap().retain();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf retain(int increment) {
+        unwrap().retain(increment);
+        return this;
+    }
+
+    @Override
+    public boolean release() {
+        return release(1);
+    }
+
+    @Override
+    public boolean release(int decrement) {
+        boolean released = unwrap().release(decrement);
+        return released;
+    }
+
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 0000000000..16025d8655
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.OutOfDirectMemoryError;
+import io.netty.util.internal.StringUtil;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+
+/**
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
+
+    private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
+
+    private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+    public final UnsafeDirectLittleEndian empty;
+    private final AtomicLong hugeBufferSize = new AtomicLong(0);
+    private final AtomicLong hugeBufferCount = new AtomicLong(0);
+    private final AtomicLong normalBufferSize = new AtomicLong(0);
+    private final AtomicLong normalBufferCount = new AtomicLong(0);
+    private final InnerAllocator allocator;
+
+    public PooledByteBufAllocatorL() {
+        allocator = new InnerAllocator();
+        empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+    }
+
+    /**
+     * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
+     */
+    public UnsafeDirectLittleEndian allocate(long size) {
+        try {
+            return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE);
+        } catch (OutOfMemoryError e) {
+            /*
+             * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
+             * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
+             * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
+             * as Netty is expected to throw an OutOfDirectMemoryError first.
+             */
+            if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) {
+                throw new OutOfMemoryException("Failure allocating buffer.", e);
+            }
+            throw e;
+        }
+    }
+
+    public int getChunkSize() {
+        return allocator.chunkSize();
+    }
+
+    public long getHugeBufferSize() {
+        return hugeBufferSize.get();
+    }
+
+    public long getHugeBufferCount() {
+        return hugeBufferCount.get();
+    }
+
+    public long getNormalBufferSize() {
+        return normalBufferSize.get();
+    }
+
+    public long getNormalBufferCount() {
+        return normalBufferSize.get();
+    }
+
+    private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
+        private final long initialCapacity;
+        private final AtomicLong count;
+        private final AtomicLong size;
+
+        private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+            super(buf);
+            this.initialCapacity = buf.capacity();
+            this.count = count;
+            this.size = size;
+        }
+
+        private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+                AtomicLong size) {
+            super(buf);
+            this.initialCapacity = buf.capacity();
+            this.count = count;
+            this.size = size;
+        }
+
+        @Override
+        public ByteBuf copy() {
+            throw new UnsupportedOperationException("copy method is not supported");
+        }
+
+        @Override
+        public ByteBuf copy(int index, int length) {
+            throw new UnsupportedOperationException("copy method is not supported");
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            boolean released = super.release(decrement);
+            if (released) {
+                count.decrementAndGet();
+                size.addAndGet(-initialCapacity);
+            }
+            return released;
+        }
+
+    }
+
+    private class InnerAllocator extends PooledByteBufAllocator {
+
+        private final PoolArena<ByteBuffer>[] directArenas;
+        private final MemoryStatusThread statusThread;
+
+        public InnerAllocator() {
+            super(true);
+
+            try {
+                Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+                f.setAccessible(true);
+                this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+            } catch (Exception e) {
+                throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+            }
+
+            if (memoryLogger.isTraceEnabled()) {
+                statusThread = new MemoryStatusThread(this);
+                statusThread.start();
+            } else {
+                statusThread = null;
+            }
+        }
+
+        private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+            PoolThreadCache cache = threadCache();
+            PoolArena<ByteBuffer> directArena = cache.directArena;
+
+            if (directArena != null) {
+
+                if (initialCapacity > chunkSize()) {
+                    // This is beyond chunk size so we'll allocate separately.
+                    ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+                    hugeBufferSize.addAndGet(buf.capacity());
+                    hugeBufferCount.incrementAndGet();
+
+                    // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+                    return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
+                            hugeBufferSize);
+                } else {
+                    // within chunk, use arena.
+                    ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+                    if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+                        fail();
+                    }
+
+                    if (!ASSERT_ENABLED) {
+                        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
+                    }
+
+                    normalBufferSize.addAndGet(buf.capacity());
+                    normalBufferCount.incrementAndGet();
+
+                    return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
+                            normalBufferCount, normalBufferSize);
+                }
+
+            } else {
+                throw fail();
+            }
+        }
+
+        private UnsupportedOperationException fail() {
+            return new UnsupportedOperationException(
+                    "Arrow requires that the JVM used supports access sun.misc.Unsafe.  This platform " +
+                            "didn't provide that functionality.");
+        }
+
+        @Override
+        public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+            if (initialCapacity == 0 && maxCapacity == 0) {
+                newDirectBuffer(initialCapacity, maxCapacity);
+            }
+            validate(initialCapacity, maxCapacity);
+            return newDirectBufferL(initialCapacity, maxCapacity);
+        }
+
+        @Override
+        public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+            throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
+        }
+
+        private void validate(int initialCapacity, int maxCapacity) {
+            if (initialCapacity < 0) {
+                throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)");
+            }
+            if (initialCapacity > maxCapacity) {
+                throw new IllegalArgumentException(String.format(
+                        "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+                        initialCapacity, maxCapacity));
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            buf.append(directArenas.length);
+            buf.append(" direct arena(s):");
+            buf.append(StringUtil.NEWLINE);
+            for (PoolArena<ByteBuffer> a : directArenas) {
+                buf.append(a);
+            }
+
+            buf.append("Large buffers outstanding: ");
+            buf.append(hugeBufferCount.get());
+            buf.append(" totaling ");
+            buf.append(hugeBufferSize.get());
+            buf.append(" bytes.");
+            buf.append('\n');
+            buf.append("Normal buffers outstanding: ");
+            buf.append(normalBufferCount.get());
+            buf.append(" totaling ");
+            buf.append(normalBufferSize.get());
+            buf.append(" bytes.");
+            return buf.toString();
+        }
+
+        private class MemoryStatusThread extends Thread {
+            private final InnerAllocator allocator;
+
+            public MemoryStatusThread(InnerAllocator allocator) {
+                super("allocation.logger");
+                this.setDaemon(true);
+                this.allocator = allocator;
+            }
+
+            @Override
+            public void run() {
+                while (true) {
+                    memoryLogger.trace("Memory Usage: \n{}", allocator);
+                    try {
+                        Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                }
+            }
+        }
+
+    }
+}
diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
new file mode 100644
index 0000000000..07f0685bc9
--- /dev/null
+++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+    public final long id = ID_GENERATOR.incrementAndGet();
+    private final AbstractByteBuf wrapped;
+    private final long memoryAddress;
+
+    UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+        this(buf, true);
+    }
+
+    UnsafeDirectLittleEndian(LargeBuffer buf) {
+        this(buf, true);
+    }
+
+    UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+        this(buf, true);
+    }
+
+    private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+        super(buf);
+
+        this.wrapped = buf;
+        this.memoryAddress = buf.memoryAddress();
+    }
+
+    private long addr(int index) {
+        return memoryAddress + index;
+    }
+
+    @Override
+    public long getLong(int index) {
+        // wrapped.checkIndex(index, 8);
+        long v = PlatformDependent.getLong(addr(index));
+        return v;
+    }
+
+    @Override
+    public float getFloat(int index) {
+        return Float.intBitsToFloat(getInt(index));
+    }
+
+    @Override
+    public ByteBuf slice() {
+        return slice(this.readerIndex(), readableBytes());
+    }
+
+    @Override
+    public ByteBuf slice(int index, int length) {
+        return new SlicedByteBuf(this, index, length);
+    }
+
+    @Override
+    public ByteBuf order(ByteOrder endianness) {
+        return this;
+    }
+
+    @Override
+    public double getDouble(int index) {
+        return Double.longBitsToDouble(getLong(index));
+    }
+
+    @Override
+    public char getChar(int index) {
+        return (char) getShort(index);
+    }
+
+    @Override
+    public long getUnsignedInt(int index) {
+        return getInt(index) & 0xFFFFFFFFL;
+    }
+
+    @Override
+    public int getInt(int index) {
+        int v = PlatformDependent.getInt(addr(index));
+        return v;
+    }
+
+    @Override
+    public int getUnsignedShort(int index) {
+        return getShort(index) & 0xFFFF;
+    }
+
+    @Override
+    public short getShort(int index) {
+        short v = PlatformDependent.getShort(addr(index));
+        return v;
+    }
+
+    @Override
+    public ByteBuf setShort(int index, int value) {
+        wrapped.checkIndex(index, 2);
+        setShort_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setInt(int index, int value) {
+        wrapped.checkIndex(index, 4);
+        setInt_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setLong(int index, long value) {
+        wrapped.checkIndex(index, 8);
+        setLong_(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setChar(int index, int value) {
+        setShort(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setFloat(int index, float value) {
+        setInt(index, Float.floatToRawIntBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf setDouble(int index, double value) {
+        setLong(index, Double.doubleToRawLongBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeShort(int value) {
+        wrapped.ensureWritable(2);
+        setShort_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 2;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeInt(int value) {
+        wrapped.ensureWritable(4);
+        setInt_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 4;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeLong(long value) {
+        wrapped.ensureWritable(8);
+        setLong_(wrapped.writerIndex, value);
+        wrapped.writerIndex += 8;
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeChar(int value) {
+        writeShort(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeFloat(float value) {
+        writeInt(Float.floatToRawIntBits(value));
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeDouble(double value) {
+        writeLong(Double.doubleToRawLongBits(value));
+        return this;
+    }
+
+    private void setShort_(int index, int value) {
+        PlatformDependent.putShort(addr(index), (short) value);
+    }
+
+    private void setInt_(int index, int value) {
+        PlatformDependent.putInt(addr(index), value);
+    }
+
+    private void setLong_(int index, long value) {
+        PlatformDependent.putLong(addr(index), value);
+    }
+
+    @Override
+    public byte getByte(int index) {
+        return PlatformDependent.getByte(addr(index));
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+        PlatformDependent.putByte(addr(index), (byte) value);
+        return this;
+    }
+
+    @Override
+    public boolean release() {
+        return release(1);
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws IOException {
+        wrapped.checkIndex(index, length);
+        byte[] tmp = new byte[length];
+        int readBytes = in.read(tmp);
+        if (readBytes > 0) {
+            PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+        }
+        return readBytes;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+        wrapped.checkIndex(index, length);
+        if (length != 0) {
+            byte[] tmp = new byte[length];
+            PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+            out.write(tmp);
+        }
+        return this;
+    }
+
+    @Override
+    public int hashCode() {
+        return System.identityHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return this == obj;
+    }
+}