You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/03/17 19:55:43 UTC

[qpid-protonj2] branch main updated: PROTON-2693 Allow access to native addresses for wrapped buffers

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new c01b46ff PROTON-2693 Allow access to native addresses for wrapped buffers
c01b46ff is described below

commit c01b46ffb850ca2e6d5edce781c574ffabe14b98
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri Mar 17 15:55:31 2023 -0400

    PROTON-2693 Allow access to native addresses for wrapped buffers
    
    When accessing the buffer component add API to allow access to the
    native address and the offset read or write position for those buffers
    that expose native (direct) buffers.
---
 .../protonj2/buffer/ProtonBufferComponent.java     |  27 ++++++
 .../buffer/ProtonBufferComponentAccessor.java      | 103 +++++++++++++++++----
 .../buffer/impl/ProtonByteArrayBuffer.java         |  15 +++
 .../buffer/impl/ProtonCompositeBufferImpl.java     |  15 +++
 .../buffer/netty/Netty4ToProtonBufferAdapter.java  |  15 +++
 .../buffer/netty/Netty5ToProtonBufferAdapter.java  |  30 ++++++
 .../buffer/netty/ProtonBufferToNetty5Adapter.java  |   6 +-
 .../buffer/impl/ProtonByteArrayBufferTest.java     |  24 +++++
 .../buffer/impl/ProtonCompositeBufferImplTest.java |  54 +++++++++++
 .../netty/Netty5ProtonBufferAdapterTest.java       |  20 ++++
 10 files changed, 288 insertions(+), 21 deletions(-)

diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponent.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponent.java
index 5bec3bde..2e095c9b 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponent.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponent.java
@@ -165,4 +165,31 @@ public interface ProtonBufferComponent {
      */
     public ProtonBufferIterator bufferIterator();
 
+    /**
+     * Gets the native address of the backing buffer if one exists otherwise returns 0.
+     * <p>
+     * The returned address is the base address for the memory region that back this buffer which
+     * should not be used for reads and write as the backing region could be larger and not all portions
+     * of the backing region might be owned by this buffer component. The caller should use the methods
+     * {@link #getNativeReadAddress()} and {@link #getNativeWriteAddress()} which each use this method
+     * to compute where these value actually fall in the native memory region.
+     *
+     * @return The native memory address for the region backing this buffer, if any, otherwise 0.
+     */
+    long getNativeAddress();
+
+    /**
+     * Gets the native address where reads from this buffer component should start.
+     *
+     * @return The native memory address where reads start, if any, otherwise 0.
+     */
+    long getNativeReadAddress();
+
+    /**
+     * Gets the native address where write to this buffer component should start.
+     *
+     * @return The native memory address where writing should start, if any, otherwise 0.
+     */
+    long getNativeWriteAddress();
+
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponentAccessor.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponentAccessor.java
index 1b1c552a..71813496 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponentAccessor.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/ProtonBufferComponentAccessor.java
@@ -33,7 +33,8 @@ import java.util.NoSuchElementException;
  * been discarded.
  * <p>
  * The general usage of the component access object should be within a try-with-resource
- * block as follows:
+ * block as follows although it should be noted that if using the iteration type component
+ * walk an allocation of an {@link Iterable} and an {@link Iterator} will be made:
  * <pre>{@code
  *   try (ProtonBufferComponentAccessor accessor = buffer.componentAccessor()) {
  *      for (ProtonBufferComponent component : accessor.readableComponents()) {
@@ -86,10 +87,10 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
     }
 
     /**
-     * Returns the first readable component that this access object provides which resets the
+     * Returns the first writable component that this access object provides which resets the
      * iteration state to the beginning.
      *
-     * @return the first readable component in the sequence of {@link ProtonBufferComponent} instance.
+     * @return the first writable component in the sequence of {@link ProtonBufferComponent} instance.
      */
     default ProtonBufferComponent firstWritable() {
         final ProtonBufferComponent current = first();
@@ -123,13 +124,53 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
         return nextWritableComponent(this);
     }
 
+    /**
+     * @return an {@link Iterable} instance over all the buffer components this instance can reach
+     */
+    default Iterable<ProtonBufferComponent> components() {
+        return new Iterable<ProtonBufferComponent>() {
+
+            @Override
+            public Iterator<ProtonBufferComponent> iterator() {
+                return componentIterator();
+            }
+        };
+    }
+
+    /**
+     * @return an {@link Iterable} instance over all the readable buffer components this instance can reach
+     */
+    default Iterable<ProtonBufferComponent> readableComponents() {
+        return new Iterable<ProtonBufferComponent>() {
+
+            @Override
+            public Iterator<ProtonBufferComponent> iterator() {
+                return readableComponentIterator();
+            }
+        };
+    }
+
+    /**
+     * @return an {@link Iterable} instance over all the writable buffer components this instance can reach
+     */
+    default Iterable<ProtonBufferComponent> writableComponents() {
+        return new Iterable<ProtonBufferComponent>() {
+
+            @Override
+            public Iterator<ProtonBufferComponent> iterator() {
+                return writableComponentIterator();
+            }
+        };
+    }
+
     /**
      * @return an {@link Iterator} that traverses all components within the {@link ProtonBuffer}
      */
     default Iterator<ProtonBufferComponent> componentIterator() {
         return new Iterator<ProtonBufferComponent>() {
 
-            private ProtonBufferComponent next = first();
+            private boolean initialized;
+            private ProtonBufferComponent next;
 
             @Override
             public boolean hasNext() {
@@ -138,14 +179,22 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
 
             @Override
             public ProtonBufferComponent next() {
-                if (next == null) {
+                if (next == null && initialized) {
                     throw new NoSuchElementException();
                 }
 
-                ProtonBufferComponent oldNext = next;
-                next = ProtonBufferComponentAccessor.this.next();
+                if (!initialized) {
+                    next = ProtonBufferComponentAccessor.this.first();
+                    initialized = true;
+                } else {
+                    next = ProtonBufferComponentAccessor.this.next();
+                }
 
-                return oldNext;
+                if (next == null) {
+                    throw new NoSuchElementException();
+                }
+
+                return next;
             }
         };
     }
@@ -156,7 +205,8 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
     default Iterator<ProtonBufferComponent> readableComponentIterator() {
         return new Iterator<ProtonBufferComponent>() {
 
-            private ProtonBufferComponent next = firstReadable();
+            private boolean initialized;
+            private ProtonBufferComponent next;
 
             @Override
             public boolean hasNext() {
@@ -165,14 +215,22 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
 
             @Override
             public ProtonBufferComponent next() {
-                if (next == null) {
+                if (next == null && initialized) {
                     throw new NoSuchElementException();
                 }
 
-                ProtonBufferComponent oldNext = next;
-                next = ProtonBufferComponentAccessor.this.nextReadable();
+                if (!initialized) {
+                    next = ProtonBufferComponentAccessor.this.firstReadable();
+                    initialized = true;
+                } else {
+                    next = ProtonBufferComponentAccessor.this.nextReadable();
+                }
+
+                if (next == null) {
+                    throw new NoSuchElementException();
+                }
 
-                return oldNext;
+                return next;
             }
         };
     }
@@ -183,7 +241,8 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
     default Iterator<ProtonBufferComponent> writableComponentIterator() {
         return new Iterator<ProtonBufferComponent>() {
 
-            private ProtonBufferComponent next = firstWritable();
+            private boolean initialized;
+            private ProtonBufferComponent next;
 
             @Override
             public boolean hasNext() {
@@ -192,14 +251,22 @@ public interface ProtonBufferComponentAccessor extends AutoCloseable {
 
             @Override
             public ProtonBufferComponent next() {
-                if (next == null) {
+                if (next == null && initialized) {
                     throw new NoSuchElementException();
                 }
 
-                ProtonBufferComponent oldNext = next;
-                next = ProtonBufferComponentAccessor.this.nextWritable();
+                if (!initialized) {
+                    next = ProtonBufferComponentAccessor.this.firstWritable();
+                    initialized = true;
+                } else {
+                    next = ProtonBufferComponentAccessor.this.nextWritable();
+                }
+
+                if (next == null) {
+                    throw new NoSuchElementException();
+                }
 
-                return oldNext;
+                return next;
             }
         };
     }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBuffer.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBuffer.java
index 22a36be5..5bdc422e 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBuffer.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBuffer.java
@@ -868,6 +868,21 @@ public final class ProtonByteArrayBuffer extends SharedResource<ProtonBuffer> im
         }
     }
 
+    @Override
+    public long getNativeAddress() {
+        return 0;
+    }
+
+    @Override
+    public long getNativeReadAddress() {
+        return 0;
+    }
+
+    @Override
+    public long getNativeWriteAddress() {
+        return 0;
+    }
+
     //----- Buffer search API
 
     @Override
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
index 5a11262d..0f567fb0 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
@@ -2211,6 +2211,21 @@ public final class ProtonCompositeBufferImpl extends SharedResource<ProtonBuffer
             return currentComponent.getWritableBuffer();
         }
 
+        @Override
+        public long getNativeAddress() {
+            return currentComponent.getNativeAddress();
+        }
+
+        @Override
+        public long getNativeReadAddress() {
+            return currentComponent.getNativeReadAddress();
+        }
+
+        @Override
+        public long getNativeWriteAddress() {
+            return currentComponent.getNativeWriteAddress();
+        }
+
         @Override
         public ProtonBufferIterator bufferIterator() {
             return currentComponent.bufferIterator();
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty4ToProtonBufferAdapter.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty4ToProtonBufferAdapter.java
index 5776cecb..7cd2bfc5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty4ToProtonBufferAdapter.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty4ToProtonBufferAdapter.java
@@ -858,6 +858,21 @@ public final class Netty4ToProtonBufferAdapter extends SharedResource<ProtonBuff
         }
     }
 
+    @Override
+    public long getNativeAddress() {
+        return 0;
+    }
+
+    @Override
+    public long getNativeReadAddress() {
+        return 0;
+    }
+
+    @Override
+    public long getNativeWriteAddress() {
+        return 0;
+    }
+
     //----- Buffer Iteration API
 
     @Override
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty5ToProtonBufferAdapter.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty5ToProtonBufferAdapter.java
index dc954df9..96dd8c99 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty5ToProtonBufferAdapter.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/Netty5ToProtonBufferAdapter.java
@@ -659,6 +659,21 @@ public final class Netty5ToProtonBufferAdapter extends SharedResource<ProtonBuff
         return resourceComponent.writableBuffer();
     }
 
+    @Override
+    public long getNativeAddress() {
+        return resourceComponent.baseNativeAddress();
+    }
+
+    @Override
+    public long getNativeReadAddress() {
+        return resourceComponent.readableNativeAddress();
+    }
+
+    @Override
+    public long getNativeWriteAddress() {
+        return resourceComponent.writableNativeAddress();
+    }
+
     //----- Buffer Iteration API
 
     @Override
@@ -914,6 +929,21 @@ public final class Netty5ToProtonBufferAdapter extends SharedResource<ProtonBuff
             return currentComponent.writableBuffer();
         }
 
+        @Override
+        public long getNativeAddress() {
+            return currentComponent.baseNativeAddress();
+        }
+
+        @Override
+        public long getNativeReadAddress() {
+            return currentComponent.readableNativeAddress();
+        }
+
+        @Override
+        public long getNativeWriteAddress() {
+            return currentComponent.writableNativeAddress();
+        }
+
         @Override
         public ProtonBufferIterator bufferIterator() {
             return new Netty5ToProtonBufferIterator(currentComponent.openCursor());
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/ProtonBufferToNetty5Adapter.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/ProtonBufferToNetty5Adapter.java
index e44f8b3e..2125e7a3 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/ProtonBufferToNetty5Adapter.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/netty/ProtonBufferToNetty5Adapter.java
@@ -1018,17 +1018,17 @@ public class ProtonBufferToNetty5Adapter implements Buffer {
 
         @Override
         public long baseNativeAddress() {
-            return 0;
+            return current.getNativeAddress();
         }
 
         @Override
         public long readableNativeAddress() {
-            return 0;
+            return current.getNativeReadAddress();
         }
 
         @Override
         public long writableNativeAddress() {
-            return 0;
+            return current.getNativeWriteAddress();
         }
 
         @Override
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBufferTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBufferTest.java
index dd969a19..1832f389 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBufferTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBufferTest.java
@@ -17,8 +17,14 @@
 
 package org.apache.qpid.protonj2.buffer.impl;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import org.apache.qpid.protonj2.buffer.ProtonAbstractBufferTest;
+import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponent;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponentAccessor;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test the byte array backed proton buffer
@@ -29,4 +35,22 @@ public class ProtonByteArrayBufferTest extends ProtonAbstractBufferTest {
     public ProtonBufferAllocator createTestCaseAllocator() {
         return new ProtonByteArrayBufferAllocator();
     }
+
+    @Test
+    public void testBufferExposesNativeAddressValues() {
+        try (ProtonBufferAllocator allocator = createTestCaseAllocator();
+             ProtonBuffer buffer = allocator.allocate(16)) {
+
+            buffer.writeLong(Long.MAX_VALUE);
+            buffer.readByte();
+
+            try (ProtonBufferComponentAccessor accessor = buffer.componentAccessor()) {
+                for (ProtonBufferComponent component : accessor.components()) {
+                    assertEquals(0, component.getNativeAddress());
+                    assertEquals(0, component.getNativeReadAddress());
+                    assertEquals(0, component.getNativeWriteAddress());
+                }
+            }
+        }
+    }
 }
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
index 0dac9ecf..c404211b 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
@@ -35,11 +35,16 @@ import org.apache.qpid.protonj2.buffer.ProtonAbstractBufferTest;
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
 import org.apache.qpid.protonj2.buffer.ProtonBufferClosedException;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponent;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponentAccessor;
 import org.apache.qpid.protonj2.buffer.ProtonBufferReadOnlyException;
 import org.apache.qpid.protonj2.buffer.ProtonCompositeBuffer;
+import org.apache.qpid.protonj2.buffer.netty.Netty5ProtonBufferAllocator;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import io.netty5.buffer.BufferAllocator;
+
 /**
  * Tests for the proton composite buffer implementation.
  */
@@ -1916,4 +1921,53 @@ public class ProtonCompositeBufferImplTest extends ProtonAbstractBufferTest {
             assertEquals(totalCapacity - splitPoint, composite.getWriteOffset());
         }
     }
+
+    @Test
+    public void testBufferExposesNativeAddressValues() {
+        try (ProtonBufferAllocator allocator = new Netty5ProtonBufferAllocator(BufferAllocator.offHeapUnpooled());
+             ProtonBuffer buffer = allocator.allocate(16)) {
+
+            buffer.writeLong(Long.MAX_VALUE);
+            buffer.readByte();
+
+            try (ProtonBufferComponentAccessor accessor = buffer.componentAccessor()) {
+                for (ProtonBufferComponent component : accessor.components()) {
+                    assertTrue(component.getNativeAddress() != 0);
+                    assertTrue(component.getNativeReadAddress() != 0);
+                    assertTrue(component.getNativeWriteAddress() != 0);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBufferExposesNativeAddressValuesForNativeBackedBuffers() {
+        try (ProtonBufferAllocator offHeapAllocator = new Netty5ProtonBufferAllocator(BufferAllocator.offHeapUnpooled());
+             ProtonBufferAllocator onHeapAllocator = createProtonDefaultAllocator();
+             ProtonCompositeBuffer buffer = onHeapAllocator.composite()) {
+
+            buffer.append(offHeapAllocator.allocate(16));
+            buffer.append(onHeapAllocator.allocate(16));
+
+            buffer.writeLong(Long.MAX_VALUE);
+            buffer.writeLong(Long.MAX_VALUE);
+            buffer.readByte();
+
+            int count = 0;
+
+            try (ProtonBufferComponentAccessor accessor = buffer.componentAccessor()) {
+                for (ProtonBufferComponent component : accessor.components()) {
+                    if (count++ == 0) {
+                        assertTrue(component.getNativeAddress() != 0);
+                        assertTrue(component.getNativeReadAddress() != 0);
+                        assertTrue(component.getNativeWriteAddress() != 0);
+                    } else {
+                        assertTrue(component.getNativeAddress() == 0);
+                        assertTrue(component.getNativeReadAddress() == 0);
+                        assertTrue(component.getNativeWriteAddress() == 0);
+                    }
+                }
+            }
+        }
+    }
 }
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/netty/Netty5ProtonBufferAdapterTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/netty/Netty5ProtonBufferAdapterTest.java
index 11c5992d..3d94ce1a 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/netty/Netty5ProtonBufferAdapterTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/netty/Netty5ProtonBufferAdapterTest.java
@@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponent;
+import org.apache.qpid.protonj2.buffer.ProtonBufferComponentAccessor;
 import org.junit.jupiter.api.Test;
 
 import io.netty5.buffer.Buffer;
@@ -37,6 +39,24 @@ public class Netty5ProtonBufferAdapterTest extends NettyBufferAdapterTestBase {
         return new Netty5ProtonBufferAllocator(BufferAllocator.onHeapUnpooled());
     }
 
+    @Test
+    public void testBufferExposesNativeAddressValues() {
+        try (ProtonBufferAllocator netty = new Netty5ProtonBufferAllocator(BufferAllocator.offHeapUnpooled());
+             ProtonBuffer nettyBuffer = netty.allocate(16)) {
+
+            nettyBuffer.writeLong(Long.MAX_VALUE);
+            nettyBuffer.readByte();
+
+            try (ProtonBufferComponentAccessor accessor = nettyBuffer.componentAccessor()) {
+                for (ProtonBufferComponent component : accessor.components()) {
+                    assertTrue(component.getNativeAddress() != 0);
+                    assertTrue(component.getNativeReadAddress() != 0);
+                    assertTrue(component.getNativeWriteAddress() != 0);
+                }
+            }
+        }
+    }
+
     @Test
     public void testBufferCloseReleasesBuffer() {
         try (ProtonBufferAllocator netty = createTestCaseAllocator();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org