You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/01/09 15:09:23 UTC

[incubator-plc4x] 03/04: PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP - Initial commit of this effort - Added a new module hierarchy for utility modules

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit cbcd7e4e8562fbd99d10b11e98bc59339d1fd15a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Jan 5 09:18:15 2018 +0100

    PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP  - Initial commit of this effort - Added a new module hierarchy for utility modules
---
 plc4j/pom.xml                                      |    2 +-
 .../rawsockets/netty/AbstractRawSocketChannel.java |  740 ++++++++++++++
 .../netty/AbstractRawSocketStreamChannel.java      | 1042 ++++++++++++++++++++
 .../utils/rawsockets/netty/RawSocketChannel.java   |  214 ++--
 .../rawsockets/netty/RawSocketChannelConfig.java   |  187 ++++
 ...SocketChannel.java => RawSocketChannelSav.java} |    4 +-
 .../rawsockets/netty/RawSocketEventArray.java      |  104 ++
 .../utils/rawsockets/netty/RawSocketEventLoop.java |  449 +++++++++
 8 files changed, 2598 insertions(+), 144 deletions(-)

diff --git a/plc4j/pom.xml b/plc4j/pom.xml
index e9932cb..62d0025 100644
--- a/plc4j/pom.xml
+++ b/plc4j/pom.xml
@@ -39,7 +39,7 @@
     <module>api</module>
     <module>core</module>
     <module>protocols</module>
-    <module>utils</module>
+    <!--module>utils</module-->
   </modules>
 
   <dependencies>
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
new file mode 100644
index 0000000..a8762ad
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
@@ -0,0 +1,740 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.channel.socket.ChannelInputShutdownReadComplete;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.Socket;
+import io.netty.channel.unix.UnixChannel;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ThrowableUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+abstract class AbstractRawSocketChannel extends AbstractChannel implements Channel {
+    private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new ClosedChannelException(), AbstractRawSocketChannel.class, "doClose()");
+    private static final ChannelMetadata METADATA = new ChannelMetadata(false);
+    private final int readFlag;
+    final LinuxSocket socket;
+    /**
+     * The future of the current connection attempt.  If not null, subsequent
+     * connection attempts will fail.
+     */
+    private ChannelPromise connectPromise;
+    private ScheduledFuture<?> connectTimeoutFuture;
+    private SocketAddress requestedRemoteAddress;
+
+    private volatile SocketAddress local;
+    private volatile SocketAddress remote;
+
+    protected int flags = Native.EPOLLET;
+    boolean inputClosedSeenErrorOnRead;
+    boolean epollInReadyRunnablePending;
+
+    protected volatile boolean active;
+
+    AbstractRawSocketChannel(LinuxSocket fd, int flag) {
+        this(null, fd, flag, false);
+    }
+
+    AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, boolean active) {
+        super(parent);
+        socket = checkNotNull(fd, "fd");
+        readFlag = flag;
+        flags |= flag;
+        this.active = active;
+        if (active) {
+            // Directly cache the remote and local addresses
+            // See https://github.com/netty/netty/issues/2359
+            local = fd.localAddress();
+            remote = fd.remoteAddress();
+        }
+    }
+
+    AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) {
+        super(parent);
+        socket = checkNotNull(fd, "fd");
+        readFlag = flag;
+        flags |= flag;
+        active = true;
+        // Directly cache the remote and local addresses
+        // See https://github.com/netty/netty/issues/2359
+        this.remote = remote;
+        local = fd.localAddress();
+    }
+
+    static boolean isSoErrorZero(Socket fd) {
+        try {
+            return fd.getSoError() == 0;
+        } catch (IOException e) {
+            throw new ChannelException(e);
+        }
+    }
+
+    void setFlag(int flag) throws IOException {
+        if (!isFlagSet(flag)) {
+            flags |= flag;
+            modifyEvents();
+        }
+    }
+
+    void clearFlag(int flag) throws IOException {
+        if (isFlagSet(flag)) {
+            flags &= ~flag;
+            modifyEvents();
+        }
+    }
+
+    boolean isFlagSet(int flag) {
+        return (flags & flag) != 0;
+    }
+
+    @Override
+    public final FileDescriptor fd() {
+        return socket;
+    }
+
+    @Override
+    public abstract RawSocketChannelConfig config();
+
+    @Override
+    public boolean isActive() {
+        return active;
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return METADATA;
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        active = false;
+        // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
+        // socket which has not even been connected yet. This has been observed to block during unit tests.
+        inputClosedSeenErrorOnRead = true;
+        try {
+            ChannelPromise promise = connectPromise;
+            if (promise != null) {
+                // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+                promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
+                connectPromise = null;
+            }
+
+            ScheduledFuture<?> future = connectTimeoutFuture;
+            if (future != null) {
+                future.cancel(false);
+                connectTimeoutFuture = null;
+            }
+
+            if (isRegistered()) {
+                // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
+                // if SO_LINGER is used.
+                //
+                // See https://github.com/netty/netty/issues/7159
+                EventLoop loop = eventLoop();
+                if (loop.inEventLoop()) {
+                    doDeregister();
+                } else {
+                    loop.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                doDeregister();
+                            } catch (Throwable cause) {
+                                pipeline().fireExceptionCaught(cause);
+                            }
+                        }
+                    });
+                }
+            }
+        } finally {
+            socket.close();
+        }
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+        doClose();
+    }
+
+    @Override
+    protected boolean isCompatible(EventLoop loop) {
+        return loop instanceof RawSocketEventLoop;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socket.isOpen();
+    }
+
+    @Override
+    protected void doDeregister() throws Exception {
+        ((RawSocketEventLoop) eventLoop()).remove(this);
+    }
+
+    @Override
+    protected final void doBeginRead() throws Exception {
+        // Channel.read() or ChannelHandlerContext.read() was called
+        final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+        unsafe.readPending = true;
+
+        // We must set the read flag here as it is possible the user didn't read in the last read loop, the
+        // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
+        // never get data after this.
+        setFlag(readFlag);
+
+        // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
+        // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
+        if (unsafe.maybeMoreDataToRead) {
+            unsafe.executeEpollInReadyRunnable(config());
+        }
+    }
+
+    final boolean shouldBreakEpollInReady(ChannelConfig config) {
+        return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
+    }
+
+    private static boolean isAllowHalfClosure(ChannelConfig config) {
+        return config instanceof RawSocketSocketChannelConfig &&
+                ((RawSocketSocketChannelConfig) config).isAllowHalfClosure();
+    }
+
+    final void clearEpollIn() {
+        // Only clear if registered with an EventLoop as otherwise
+        if (isRegistered()) {
+            final EventLoop loop = eventLoop();
+            final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+            if (loop.inEventLoop()) {
+                unsafe.clearEpollIn0();
+            } else {
+                // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
+                loop.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        if (!unsafe.readPending && !config().isAutoRead()) {
+                            // Still no read triggered so clear it now
+                            unsafe.clearEpollIn0();
+                        }
+                    }
+                });
+            }
+        } else  {
+            // The EventLoop is not registered atm so just update the flags so the correct value
+            // will be used once the channel is registered
+            flags &= ~readFlag;
+        }
+    }
+
+    private void modifyEvents() throws IOException {
+        if (isOpen() && isRegistered()) {
+            ((RawSocketEventLoop) eventLoop()).modify(this);
+        }
+    }
+
+    @Override
+    protected void doRegister() throws Exception {
+        // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
+        // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
+        // new EventLoop.
+        epollInReadyRunnablePending = false;
+        ((RawSocketEventLoop) eventLoop()).add(this);
+    }
+
+    @Override
+    protected abstract AbstractEpollUnsafe newUnsafe();
+
+    /**
+     * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
+     */
+    protected final ByteBuf newDirectBuffer(ByteBuf buf) {
+        return newDirectBuffer(buf, buf);
+    }
+
+    /**
+     * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
+     * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
+     * this method.
+     */
+    protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
+        final int readableBytes = buf.readableBytes();
+        if (readableBytes == 0) {
+            ReferenceCountUtil.release(holder);
+            return Unpooled.EMPTY_BUFFER;
+        }
+
+        final ByteBufAllocator alloc = alloc();
+        if (alloc.isDirectBufferPooled()) {
+            return newDirectBuffer0(holder, buf, alloc, readableBytes);
+        }
+
+        final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
+        if (directBuf == null) {
+            return newDirectBuffer0(holder, buf, alloc, readableBytes);
+        }
+
+        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
+        ReferenceCountUtil.safeRelease(holder);
+        return directBuf;
+    }
+
+    private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
+        final ByteBuf directBuf = alloc.directBuffer(capacity);
+        directBuf.writeBytes(buf, buf.readerIndex(), capacity);
+        ReferenceCountUtil.safeRelease(holder);
+        return directBuf;
+    }
+
+    protected static void checkResolvable(InetSocketAddress addr) {
+        if (addr.isUnresolved()) {
+            throw new UnresolvedAddressException();
+        }
+    }
+
+    /**
+     * Read bytes into the given {@link ByteBuf} and return the amount.
+     */
+    protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
+        int writerIndex = byteBuf.writerIndex();
+        int localReadAmount;
+        unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
+        if (byteBuf.hasMemoryAddress()) {
+            localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
+        } else {
+            ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
+            localReadAmount = socket.read(buf, buf.position(), buf.limit());
+        }
+        if (localReadAmount > 0) {
+            byteBuf.writerIndex(writerIndex + localReadAmount);
+        }
+        return localReadAmount;
+    }
+
+    protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+        if (buf.hasMemoryAddress()) {
+            int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
+            if (localFlushedAmount > 0) {
+                in.removeBytes(localFlushedAmount);
+                return 1;
+            }
+        } else {
+            final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
+                    buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
+            int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
+            if (localFlushedAmount > 0) {
+                nioBuf.position(nioBuf.position() + localFlushedAmount);
+                in.removeBytes(localFlushedAmount);
+                return 1;
+            }
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
+        boolean readPending;
+        boolean maybeMoreDataToRead;
+        private RawSocketRecvByteAllocatorHandle allocHandle;
+        private final Runnable epollInReadyRunnable = new Runnable() {
+            @Override
+            public void run() {
+                epollInReadyRunnablePending = false;
+                epollInReady();
+            }
+        };
+
+        /**
+         * Called once EPOLLIN event is ready to be processed
+         */
+        abstract void epollInReady();
+
+        final void epollInBefore() { maybeMoreDataToRead = false; }
+
+        final void epollInFinally(ChannelConfig config) {
+            maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
+            // Check if there is a readPending which was not processed yet.
+            // This could be for two reasons:
+            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
+            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
+            //
+            // See https://github.com/netty/netty/issues/2254
+            if (!readPending && !config.isAutoRead()) {
+                clearEpollIn();
+            } else if (readPending && maybeMoreDataToRead) {
+                // trigger a read again as there may be something left to read and because of epoll ET we
+                // will not get notified again until we read everything from the socket
+                //
+                // It is possible the last fireChannelRead call could cause the user to call read() again, or if
+                // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
+                // to false before every read operation to prevent re-entry into epollInReady() we will not read from
+                // the underlying OS again unless the user happens to call read again.
+                executeEpollInReadyRunnable(config);
+            }
+        }
+
+        final void executeEpollInReadyRunnable(ChannelConfig config) {
+            if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
+                return;
+            }
+            epollInReadyRunnablePending = true;
+            eventLoop().execute(epollInReadyRunnable);
+        }
+
+        /**
+         * Called once EPOLLRDHUP event is ready to be processed
+         */
+        final void epollRdHupReady() {
+            // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
+            recvBufAllocHandle().receivedRdHup();
+
+            if (isActive()) {
+                // If it is still active, we need to call epollInReady as otherwise we may miss to
+                // read pending data from the underlying file descriptor.
+                // See https://github.com/netty/netty/issues/3709
+                epollInReady();
+            } else {
+                // Just to be safe make sure the input marked as closed.
+                shutdownInput(true);
+            }
+
+            // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
+            clearEpollRdHup();
+        }
+
+        /**
+         * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
+         */
+        private void clearEpollRdHup() {
+            try {
+                clearFlag(Native.EPOLLRDHUP);
+            } catch (IOException e) {
+                pipeline().fireExceptionCaught(e);
+                close(voidPromise());
+            }
+        }
+
+        /**
+         * Shutdown the input side of the channel.
+         */
+        void shutdownInput(boolean rdHup) {
+            if (!socket.isInputShutdown()) {
+                if (isAllowHalfClosure(config())) {
+                    try {
+                        socket.shutdown(true, false);
+                    } catch (IOException ignored) {
+                        // We attempted to shutdown and failed, which means the input has already effectively been
+                        // shutdown.
+                        fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
+                        return;
+                    } catch (NotYetConnectedException ignore) {
+                        // We attempted to shutdown and failed, which means the input has already effectively been
+                        // shutdown.
+                    }
+                    clearEpollIn();
+                    pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
+                } else {
+                    close(voidPromise());
+                }
+            } else if (!rdHup) {
+                inputClosedSeenErrorOnRead = true;
+                pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
+            }
+        }
+
+        private void fireEventAndClose(Object evt) {
+            pipeline().fireUserEventTriggered(evt);
+            close(voidPromise());
+        }
+
+        @Override
+        public RawSocketRecvByteAllocatorHandle recvBufAllocHandle() {
+            if (allocHandle == null) {
+                allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
+            }
+            return allocHandle;
+        }
+
+        /**
+         * Create a new {@link RawSocketRecvByteAllocatorHandle} instance.
+         * @param handle The handle to wrap with EPOLL specific logic.
+         */
+        RawSocketRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+            return new RawSocketRecvByteAllocatorHandle(handle);
+        }
+
+        @Override
+        protected void flush0() {
+            // Flush immediately only when there's no pending flush.
+            // If there's a pending flush operation, event loop will call forceFlush() later,
+            // and thus there's no need to call it now.
+            if (isFlagSet(Native.EPOLLOUT)) {
+                return;
+            }
+            super.flush0();
+        }
+
+        /**
+         * Called once a EPOLLOUT event is ready to be processed
+         */
+        final void epollOutReady() {
+            if (connectPromise != null) {
+                // pending connect which is now complete so handle it.
+                finishConnect();
+            } else if (!socket.isOutputShutdown()) {
+                // directly call super.flush0() to force a flush now
+                super.flush0();
+            }
+        }
+
+        protected final void clearEpollIn0() {
+            assert eventLoop().inEventLoop();
+            try {
+                readPending = false;
+                clearFlag(readFlag);
+            } catch (IOException e) {
+                // When this happens there is something completely wrong with either the filedescriptor or epoll,
+                // so fire the exception through the pipeline and close the Channel.
+                pipeline().fireExceptionCaught(e);
+                unsafe().close(unsafe().voidPromise());
+            }
+        }
+
+        @Override
+        public void connect(
+                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
+            if (!promise.setUncancellable() || !ensureOpen(promise)) {
+                return;
+            }
+
+            try {
+                if (connectPromise != null) {
+                    throw new ConnectionPendingException();
+                }
+
+                boolean wasActive = isActive();
+                if (doConnect(remoteAddress, localAddress)) {
+                    fulfillConnectPromise(promise, wasActive);
+                } else {
+                    connectPromise = promise;
+                    requestedRemoteAddress = remoteAddress;
+
+                    // Schedule connect timeout.
+                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
+                    if (connectTimeoutMillis > 0) {
+                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
+                            @Override
+                            public void run() {
+                                ChannelPromise connectPromise = AbstractRawSocketChannel.this.connectPromise;
+                                ConnectTimeoutException cause =
+                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
+                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
+                                    close(voidPromise());
+                                }
+                            }
+                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
+                    }
+
+                    promise.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            if (future.isCancelled()) {
+                                if (connectTimeoutFuture != null) {
+                                    connectTimeoutFuture.cancel(false);
+                                }
+                                connectPromise = null;
+                                close(voidPromise());
+                            }
+                        }
+                    });
+                }
+            } catch (Throwable t) {
+                closeIfClosed();
+                promise.tryFailure(annotateConnectException(t, remoteAddress));
+            }
+        }
+
+        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
+            if (promise == null) {
+                // Closed via cancellation and the promise has been notified already.
+                return;
+            }
+            active = true;
+
+            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
+            // We still need to ensure we call fireChannelActive() in this case.
+            boolean active = isActive();
+
+            // trySuccess() will return false if a user cancelled the connection attempt.
+            boolean promiseSet = promise.trySuccess();
+
+            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
+            // because what happened is what happened.
+            if (!wasActive && active) {
+                pipeline().fireChannelActive();
+            }
+
+            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
+            if (!promiseSet) {
+                close(voidPromise());
+            }
+        }
+
+        private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
+            if (promise == null) {
+                // Closed via cancellation and the promise has been notified already.
+                return;
+            }
+
+            // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+            promise.tryFailure(cause);
+            closeIfClosed();
+        }
+
+        private void finishConnect() {
+            // Note this method is invoked by the event loop only if the connection attempt was
+            // neither cancelled nor timed out.
+
+            assert eventLoop().inEventLoop();
+
+            boolean connectStillInProgress = false;
+            try {
+                boolean wasActive = isActive();
+                if (!doFinishConnect()) {
+                    connectStillInProgress = true;
+                    return;
+                }
+                fulfillConnectPromise(connectPromise, wasActive);
+            } catch (Throwable t) {
+                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
+            } finally {
+                if (!connectStillInProgress) {
+                    // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
+                    // See https://github.com/netty/netty/issues/1770
+                    if (connectTimeoutFuture != null) {
+                        connectTimeoutFuture.cancel(false);
+                    }
+                    connectPromise = null;
+                }
+            }
+        }
+
+        /**
+         * Finish the connect
+         */
+        private boolean doFinishConnect() throws Exception {
+            if (socket.finishConnect()) {
+                clearFlag(Native.EPOLLOUT);
+                if (requestedRemoteAddress instanceof InetSocketAddress) {
+                    remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
+                }
+                requestedRemoteAddress = null;
+
+                return true;
+            }
+            setFlag(Native.EPOLLOUT);
+            return false;
+        }
+    }
+
+    @Override
+    protected void doBind(SocketAddress local) throws Exception {
+        if (local instanceof InetSocketAddress) {
+            checkResolvable((InetSocketAddress) local);
+        }
+        socket.bind(local);
+        this.local = socket.localAddress();
+    }
+
+    /**
+     * Connect to the remote peer
+     */
+    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        if (localAddress instanceof InetSocketAddress) {
+            checkResolvable((InetSocketAddress) localAddress);
+        }
+
+        InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
+                ? (InetSocketAddress) remoteAddress : null;
+        if (remoteSocketAddr != null) {
+            checkResolvable(remoteSocketAddr);
+        }
+
+        if (remote != null) {
+            // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
+            // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
+            // later.
+            throw new AlreadyConnectedException();
+        }
+
+        if (localAddress != null) {
+            socket.bind(localAddress);
+        }
+
+        boolean connected = doConnect0(remoteAddress);
+        if (connected) {
+            remote = remoteSocketAddr == null ?
+                    remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
+        }
+        // We always need to set the localAddress even if not connected yet as the bind already took place.
+        //
+        // See https://github.com/netty/netty/issues/3463
+        local = socket.localAddress();
+        return connected;
+    }
+
+    private boolean doConnect0(SocketAddress remote) throws Exception {
+        boolean success = false;
+        try {
+            boolean connected = socket.connect(remote);
+            if (!connected) {
+                setFlag(Native.EPOLLOUT);
+            }
+            success = true;
+            return connected;
+        } finally {
+            if (!success) {
+                doClose();
+            }
+        }
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return local;
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return remote;
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
new file mode 100644
index 0000000..a1f2ff4
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
@@ -0,0 +1,1042 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+import io.netty.channel.internal.ChannelUtils;
+import io.netty.channel.socket.DuplexChannel;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.IovArray;
+import io.netty.channel.unix.SocketWritableByteChannel;
+import io.netty.channel.unix.UnixChannelUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.StringUtil;
+import io.netty.util.internal.ThrowableUtil;
+import io.netty.util.internal.UnstableApi;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.FileDescriptor.pipe;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+public abstract class AbstractRawSocketStreamChannel extends AbstractRawSocketChannel implements DuplexChannel {
+    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
+    private static final String EXPECTED_TYPES =
+            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
+                    StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRawSocketStreamChannel.class);
+    private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
+            ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+                    AbstractRawSocketStreamChannel.class, "clearSpliceQueue()");
+    private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new ClosedChannelException(),
+            AbstractRawSocketStreamChannel.class, "spliceTo(...)");
+    private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
+            ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+            AbstractRawSocketStreamChannel.class, "failSpliceIfClosed(...)");
+    private final Runnable flushTask = new Runnable() {
+        @Override
+        public void run() {
+            flush();
+        }
+    };
+    private Queue<SpliceInTask> spliceQueue;
+
+    // Lazy init these if we need to splice(...)
+    private FileDescriptor pipeIn;
+    private FileDescriptor pipeOut;
+
+    private WritableByteChannel byteChannel;
+
+    protected AbstractRawSocketStreamChannel(Channel parent, int fd) {
+        this(parent, new LinuxSocket(fd));
+    }
+
+    protected AbstractRawSocketStreamChannel(int fd) {
+        this(new LinuxSocket(fd));
+    }
+
+    AbstractRawSocketStreamChannel(LinuxSocket fd) {
+        this(fd, isSoErrorZero(fd));
+    }
+
+    AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd) {
+        super(parent, fd, Native.EPOLLIN, true);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
+        super(parent, fd, Native.EPOLLIN, remote);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    protected AbstractRawSocketStreamChannel(LinuxSocket fd, boolean active) {
+        super(null, fd, Native.EPOLLIN, active);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    @Override
+    protected AbstractEpollUnsafe newUnsafe() {
+        return new EpollStreamUnsafe();
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return METADATA;
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+     * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+     * splice until the {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+     *   {@link IllegalArgumentException} is thrown. </li>
+     *   <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+     *   target {@link AbstractRawSocketStreamChannel}</li>
+     * </ul>
+     *
+     */
+    public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len) {
+        return spliceTo(ch, len, newPromise());
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+     * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+     * splice until the {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+     *   {@link IllegalArgumentException} is thrown. </li>
+     *   <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+     *   target {@link AbstractRawSocketStreamChannel}</li>
+     * </ul>
+     *
+     */
+    public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len,
+                                        final ChannelPromise promise) {
+        if (ch.eventLoop() != eventLoop()) {
+            throw new IllegalArgumentException("EventLoops are not the same.");
+        }
+        if (len < 0) {
+            throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+        }
+        if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
+                || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
+            throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
+        }
+        checkNotNull(promise, "promise");
+        if (!isOpen()) {
+            promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+        } else {
+            addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
+            failSpliceIfClosed(promise);
+        }
+        return promise;
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+     * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+     * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+     * {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+     *   {@link AbstractRawSocketStreamChannel}</li>
+     *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
+     *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+     * </ul>
+     */
+    public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
+        return spliceTo(ch, offset, len, newPromise());
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+     * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+     * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+     * {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+     *   {@link AbstractRawSocketStreamChannel}</li>
+     *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
+     *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+     * </ul>
+     */
+    public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
+                                        final ChannelPromise promise) {
+        if (len < 0) {
+            throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+        }
+        if (offset < 0) {
+            throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
+        }
+        if (config().getRawSocketMode() != RawSocketMode.LEVEL_TRIGGERED) {
+            throw new IllegalStateException("spliceTo() supported only when using " + RawSocketMode.LEVEL_TRIGGERED);
+        }
+        checkNotNull(promise, "promise");
+        if (!isOpen()) {
+            promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+        } else {
+            addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
+            failSpliceIfClosed(promise);
+        }
+        return promise;
+    }
+
+    private void failSpliceIfClosed(ChannelPromise promise) {
+        if (!isOpen()) {
+            // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
+            // cases where a future may not be notified otherwise.
+            if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
+                eventLoop().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Call this via the EventLoop as it is a MPSC queue.
+                        clearSpliceQueue();
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
+     * @param in the collection which contains objects to write.
+     * @param buf the {@link ByteBuf} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+        int readableBytes = buf.readableBytes();
+        if (readableBytes == 0) {
+            in.remove();
+            return 0;
+        }
+
+        if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
+            return doWriteBytes(in, buf);
+        } else {
+            ByteBuffer[] nioBuffers = buf.nioBuffers();
+            return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
+                    config().getMaxBytesPerGatheringWrite());
+        }
+    }
+
+    private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
+        // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
+        // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
+        // make a best effort to adjust as OS behavior changes.
+        if (attempted == written) {
+            if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
+                config().setMaxBytesPerGatheringWrite(attempted << 1);
+            }
+        } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
+            config().setMaxBytesPerGatheringWrite(attempted >>> 1);
+        }
+    }
+
+    /**
+     * Write multiple bytes via {@link IovArray}.
+     * @param in the collection which contains objects to write.
+     * @param array The array which contains the content to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws IOException If an I/O exception occurs during write.
+     */
+    private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
+        final long expectedWrittenBytes = array.size();
+        assert expectedWrittenBytes != 0;
+        final int cnt = array.count();
+        assert cnt != 0;
+
+        final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
+        if (localWrittenBytes > 0) {
+            adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
+            in.removeBytes(localWrittenBytes);
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write multiple bytes via {@link ByteBuffer} array.
+     * @param in the collection which contains objects to write.
+     * @param nioBuffers The buffers to write.
+     * @param nioBufferCnt The number of buffers to write.
+     * @param expectedWrittenBytes The number of bytes we expect to write.
+     * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws IOException If an I/O exception occurs during write.
+     */
+    private int writeBytesMultiple(
+        ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
+        long maxBytesPerGatheringWrite) throws IOException {
+        assert expectedWrittenBytes != 0;
+        if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
+            expectedWrittenBytes = maxBytesPerGatheringWrite;
+        }
+
+        final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
+        if (localWrittenBytes > 0) {
+            adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
+            in.removeBytes(localWrittenBytes);
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write a {@link DefaultFileRegion}
+     * @param in the collection which contains objects to write.
+     * @param region the {@link DefaultFileRegion} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
+        final long regionCount = region.count();
+        if (region.transferred() >= regionCount) {
+            in.remove();
+            return 0;
+        }
+
+        final long offset = region.transferred();
+        final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
+        if (flushedAmount > 0) {
+            in.progress(flushedAmount);
+            if (region.transferred() >= regionCount) {
+                in.remove();
+            }
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write a {@link FileRegion}
+     * @param in the collection which contains objects to write.
+     * @param region the {@link FileRegion} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
+        if (region.transferred() >= region.count()) {
+            in.remove();
+            return 0;
+        }
+
+        if (byteChannel == null) {
+            byteChannel = new RawSocketSocketWritableByteChannel();
+        }
+        final long flushedAmount = region.transferTo(byteChannel, region.transferred());
+        if (flushedAmount > 0) {
+            in.progress(flushedAmount);
+            if (region.transferred() >= region.count()) {
+                in.remove();
+            }
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    @Override
+    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+        int writeSpinCount = config().getWriteSpinCount();
+        do {
+            final int msgCount = in.size();
+            // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
+            if (msgCount > 1 && in.current() instanceof ByteBuf) {
+                writeSpinCount -= doWriteMultiple(in);
+            } else if (msgCount == 0) {
+                // Wrote all messages.
+                clearFlag(Native.EPOLLOUT);
+                // Return here so we not set the EPOLLOUT flag.
+                return;
+            } else {  // msgCount == 1
+                writeSpinCount -= doWriteSingle(in);
+            }
+
+            // We do not break the loop here even if the outbound buffer was flushed completely,
+            // because a user might have triggered another write and flush when we notify his or her
+            // listeners.
+        } while (writeSpinCount > 0);
+
+        if (writeSpinCount == 0) {
+            // We used our writeSpin quantum, and should try to write again later.
+            eventLoop().execute(flushTask);
+        } else {
+            // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
+            // when it can accept more data.
+            setFlag(Native.EPOLLOUT);
+        }
+    }
+
+    /**
+     * Attempt to write a single object.
+     * @param in the collection which contains objects to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws Exception If an I/O error occurs.
+     */
+    protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
+        // The outbound buffer contains only one message or it contains a file region.
+        Object msg = in.current();
+        if (msg instanceof ByteBuf) {
+            return writeBytes(in, (ByteBuf) msg);
+        } else if (msg instanceof DefaultFileRegion) {
+            return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
+        } else if (msg instanceof FileRegion) {
+            return writeFileRegion(in, (FileRegion) msg);
+        } else if (msg instanceof SpliceOutTask) {
+            if (!((SpliceOutTask) msg).spliceOut()) {
+                return WRITE_STATUS_SNDBUF_FULL;
+            }
+            in.remove();
+            return 1;
+        } else {
+            // Should never reach here.
+            throw new Error();
+        }
+    }
+
+    /**
+     * Attempt to write multiple {@link ByteBuf} objects.
+     * @param in the collection which contains objects to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws Exception If an I/O error occurs.
+     */
+    private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
+        final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
+        if (PlatformDependent.hasUnsafe()) {
+            IovArray array = ((RawSocketEventLoop) eventLoop()).cleanArray();
+            array.maxBytes(maxBytesPerGatheringWrite);
+            in.forEachFlushedMessage(array);
+
+            if (array.count() >= 1) {
+                // TODO: Handle the case where cnt == 1 specially.
+                return writeBytesMultiple(in, array);
+            }
+        } else {
+            ByteBuffer[] buffers = in.nioBuffers();
+            int cnt = in.nioBufferCount();
+            if (cnt >= 1) {
+                // TODO: Handle the case where cnt == 1 specially.
+                return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
+            }
+        }
+        // cnt == 0, which means the outbound buffer contained empty buffers only.
+        in.removeBytes(0);
+        return 0;
+    }
+
+    @Override
+    protected Object filterOutboundMessage(Object msg) {
+        if (msg instanceof ByteBuf) {
+            ByteBuf buf = (ByteBuf) msg;
+            return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
+        }
+
+        if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
+            return msg;
+        }
+
+        throw new UnsupportedOperationException(
+                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
+    }
+
+    @UnstableApi
+    @Override
+    protected final void doShutdownOutput() throws Exception {
+        socket.shutdown(false, true);
+    }
+
+    private void shutdownInput0(final ChannelPromise promise) {
+        try {
+            socket.shutdown(true, false);
+            promise.setSuccess();
+        } catch (Throwable cause) {
+            promise.setFailure(cause);
+        }
+    }
+
+    @Override
+    public boolean isOutputShutdown() {
+        return socket.isOutputShutdown();
+    }
+
+    @Override
+    public boolean isInputShutdown() {
+        return socket.isInputShutdown();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return socket.isShutdown();
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput() {
+        return shutdownOutput(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput(final ChannelPromise promise) {
+        EventLoop loop = eventLoop();
+        if (loop.inEventLoop()) {
+            ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+        } else {
+            loop.execute(new Runnable() {
+                @Override
+                public void run() {
+                    ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+                }
+            });
+        }
+
+        return promise;
+    }
+
+    @Override
+    public ChannelFuture shutdownInput() {
+        return shutdownInput(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdownInput(final ChannelPromise promise) {
+        Executor closeExecutor = ((RawSocketStreamUnsafe) unsafe()).prepareToClose();
+        if (closeExecutor != null) {
+            closeExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    shutdownInput0(promise);
+                }
+            });
+        } else {
+            EventLoop loop = eventLoop();
+            if (loop.inEventLoop()) {
+                shutdownInput0(promise);
+            } else {
+                loop.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        shutdownInput0(promise);
+                    }
+                });
+            }
+        }
+        return promise;
+    }
+
+    @Override
+    public ChannelFuture shutdown() {
+        return shutdown(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdown(final ChannelPromise promise) {
+        ChannelFuture shutdownOutputFuture = shutdownOutput();
+        if (shutdownOutputFuture.isDone()) {
+            shutdownOutputDone(shutdownOutputFuture, promise);
+        } else {
+            shutdownOutputFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
+                    shutdownOutputDone(shutdownOutputFuture, promise);
+                }
+            });
+        }
+        return promise;
+    }
+
+    private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
+        ChannelFuture shutdownInputFuture = shutdownInput();
+        if (shutdownInputFuture.isDone()) {
+            shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+        } else {
+            shutdownInputFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
+                    shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+                }
+            });
+        }
+    }
+
+    private static void shutdownDone(ChannelFuture shutdownOutputFuture,
+                              ChannelFuture shutdownInputFuture,
+                              ChannelPromise promise) {
+        Throwable shutdownOutputCause = shutdownOutputFuture.cause();
+        Throwable shutdownInputCause = shutdownInputFuture.cause();
+        if (shutdownOutputCause != null) {
+            if (shutdownInputCause != null) {
+                logger.debug("Exception suppressed because a previous exception occurred.",
+                        shutdownInputCause);
+            }
+            promise.setFailure(shutdownOutputCause);
+        } else if (shutdownInputCause != null) {
+            promise.setFailure(shutdownInputCause);
+        } else {
+            promise.setSuccess();
+        }
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        try {
+            // Calling super.doClose() first so spliceTo(...) will fail on next call.
+            super.doClose();
+        } finally {
+            safeClosePipe(pipeIn);
+            safeClosePipe(pipeOut);
+            clearSpliceQueue();
+        }
+    }
+
+    private void clearSpliceQueue() {
+        if (spliceQueue == null) {
+            return;
+        }
+        for (;;) {
+            SpliceInTask task = spliceQueue.poll();
+            if (task == null) {
+                break;
+            }
+            task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
+        }
+    }
+
+    private static void safeClosePipe(FileDescriptor fd) {
+        if (fd != null) {
+            try {
+                fd.close();
+            } catch (IOException e) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Error while closing a pipe", e);
+                }
+            }
+        }
+    }
+
+    class RawSocketStreamUnsafe extends AbstractRawSocketUnsafe {
+        // Overridden here just to be able to access this method from AbstractRawSocketStreamChannel
+        @Override
+        protected Executor prepareToClose() {
+            return super.prepareToClose();
+        }
+
+        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
+                                         RawSocketRecvByteAllocatorHandle allocHandle) {
+            if (byteBuf != null) {
+                if (byteBuf.isReadable()) {
+                    readPending = false;
+                    pipeline.fireChannelRead(byteBuf);
+                } else {
+                    byteBuf.release();
+                }
+            }
+            allocHandle.readComplete();
+            pipeline.fireChannelReadComplete();
+            pipeline.fireExceptionCaught(cause);
+            if (close || cause instanceof IOException) {
+                shutdownInput(false);
+            }
+        }
+
+        @Override
+        RawSocketRecvByteAllocatorHandle newRawSocketHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+            return new RawSocketRecvByteAllocatorStreamingHandle(handle);
+        }
+
+        @Override
+        void RawSocketInReady() {
+            final ChannelConfig config = config();
+            if (shouldBreakRawSocketInReady(config)) {
+                clearRawSocketIn0();
+                return;
+            }
+            final RawSocketRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+            allocHandle.edgeTriggered(isFlagSet(Native.RawSocketET));
+
+            final ChannelPipeline pipeline = pipeline();
+            final ByteBufAllocator allocator = config.getAllocator();
+            allocHandle.reset(config);
+            RawSocketInBefore();
+
+            ByteBuf byteBuf = null;
+            boolean close = false;
+            try {
+                do {
+                    if (spliceQueue != null) {
+                        SpliceInTask spliceTask = spliceQueue.peek();
+                        if (spliceTask != null) {
+                            if (spliceTask.spliceIn(allocHandle)) {
+                                // We need to check if it is still active as if not we removed all SpliceTasks in
+                                // doClose(...)
+                                if (isActive()) {
+                                    spliceQueue.remove();
+                                }
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+
+                    // we use a direct buffer here as the native implementations only be able
+                    // to handle direct buffers.
+                    byteBuf = allocHandle.allocate(allocator);
+                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
+                    if (allocHandle.lastBytesRead() <= 0) {
+                        // nothing was read, release the buffer.
+                        byteBuf.release();
+                        byteBuf = null;
+                        close = allocHandle.lastBytesRead() < 0;
+                        if (close) {
+                            // There is nothing left to read as we received an EOF.
+                            readPending = false;
+                        }
+                        break;
+                    }
+                    allocHandle.incMessagesRead(1);
+                    readPending = false;
+                    pipeline.fireChannelRead(byteBuf);
+                    byteBuf = null;
+
+                    if (shouldBreakRawSocketInReady(config)) {
+                        // We need to do this for two reasons:
+                        //
+                        // - If the input was shutdown in between (which may be the case when the user did it in the
+                        //   fireChannelRead(...) method we should not try to read again to not produce any
+                        //   miss-leading exceptions.
+                        //
+                        // - If the user closes the channel we need to ensure we not try to read from it again as
+                        //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
+                        //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
+                        //   reading data from a filedescriptor that belongs to another socket then the socket that
+                        //   was "wrapped" by this Channel implementation.
+                        break;
+                    }
+                } while (allocHandle.continueReading());
+
+                allocHandle.readComplete();
+                pipeline.fireChannelReadComplete();
+
+                if (close) {
+                    shutdownInput(false);
+                }
+            } catch (Throwable t) {
+                handleReadException(pipeline, byteBuf, t, close, allocHandle);
+            } finally {
+                RawSocketInFinally(config);
+            }
+        }
+    }
+
+    private void addToSpliceQueue(final SpliceInTask task) {
+        EventLoop eventLoop = eventLoop();
+        if (eventLoop.inEventLoop()) {
+            addToSpliceQueue0(task);
+        } else {
+            eventLoop.execute(new Runnable() {
+                @Override
+                public void run() {
+                    addToSpliceQueue0(task);
+                }
+            });
+        }
+    }
+
+    private void addToSpliceQueue0(SpliceInTask task) {
+        if (spliceQueue == null) {
+            spliceQueue = PlatformDependent.newMpscQueue();
+        }
+        spliceQueue.add(task);
+    }
+
+    protected abstract class SpliceInTask {
+        final ChannelPromise promise;
+        int len;
+
+        protected SpliceInTask(int len, ChannelPromise promise) {
+            this.promise = promise;
+            this.len = len;
+        }
+
+        abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
+
+        protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
+            // calculate the maximum amount of data we are allowed to splice
+            int length = Math.min(handle.guess(), len);
+            int splicedIn = 0;
+            for (;;) {
+                // Splicing until there is nothing left to splice.
+                int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
+                if (localSplicedIn == 0) {
+                    break;
+                }
+                splicedIn += localSplicedIn;
+                length -= localSplicedIn;
+            }
+
+            return splicedIn;
+        }
+    }
+
+    // Let it directly implement channelFutureListener as well to reduce object creation.
+    private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
+        private final AbstractRawSocketStreamChannel ch;
+
+        SpliceInChannelTask(AbstractRawSocketStreamChannel ch, int len, ChannelPromise promise) {
+            super(len, promise);
+            this.ch = ch;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                promise.setFailure(future.cause());
+            }
+        }
+
+        @Override
+        public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+            assert ch.eventLoop().inEventLoop();
+            if (len == 0) {
+                promise.setSuccess();
+                return true;
+            }
+            try {
+                // We create the pipe on the target channel as this will allow us to just handle pending writes
+                // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
+                // on multiple Channels pointing to one target Channel.
+                FileDescriptor pipeOut = ch.pipeOut;
+                if (pipeOut == null) {
+                    // Create a new pipe as non was created before.
+                    FileDescriptor[] pipe = pipe();
+                    ch.pipeIn = pipe[0];
+                    pipeOut = ch.pipeOut = pipe[1];
+                }
+
+                int splicedIn = spliceIn(pipeOut, handle);
+                if (splicedIn > 0) {
+                    // Integer.MAX_VALUE is a special value which will result in splice forever.
+                    if (len != Integer.MAX_VALUE) {
+                        len -= splicedIn;
+                    }
+
+                    // Depending on if we are done with splicing inbound data we set the right promise for the
+                    // outbound splicing.
+                    final ChannelPromise splicePromise;
+                    if (len == 0) {
+                        splicePromise = promise;
+                    } else {
+                        splicePromise = ch.newPromise().addListener(this);
+                    }
+
+                    boolean autoRead = config().isAutoRead();
+
+                    // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
+                    // case.
+                    ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
+                    ch.unsafe().flush();
+                    if (autoRead && !splicePromise.isDone()) {
+                        // Write was not done which means the target channel was not writable. In this case we need to
+                        // disable reading until we are done with splicing to the target channel because:
+                        //
+                        // - The user may want to to trigger another splice operation once the splicing was complete.
+                        config().setAutoRead(false);
+                    }
+                }
+
+                return len == 0;
+            } catch (Throwable cause) {
+                promise.setFailure(cause);
+                return true;
+            }
+        }
+    }
+
+    private final class SpliceOutTask {
+        private final AbstractRawSocketStreamChannel ch;
+        private final boolean autoRead;
+        private int len;
+
+        SpliceOutTask(AbstractRawSocketStreamChannel ch, int len, boolean autoRead) {
+            this.ch = ch;
+            this.len = len;
+            this.autoRead = autoRead;
+        }
+
+        public boolean spliceOut() throws Exception {
+            assert ch.eventLoop().inEventLoop();
+            try {
+                int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
+                len -= splicedOut;
+                if (len == 0) {
+                    if (autoRead) {
+                        // AutoRead was used and we spliced everything so start reading again
+                        config().setAutoRead(true);
+                    }
+                    return true;
+                }
+                return false;
+            } catch (IOException e) {
+                if (autoRead) {
+                    // AutoRead was used and we spliced everything so start reading again
+                    config().setAutoRead(true);
+                }
+                throw e;
+            }
+        }
+    }
+
+    private final class SpliceFdTask extends SpliceInTask {
+        private final FileDescriptor fd;
+        private final ChannelPromise promise;
+        private final int offset;
+
+        SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
+            super(len, promise);
+            this.fd = fd;
+            this.promise = promise;
+            this.offset = offset;
+        }
+
+        @Override
+        public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+            assert eventLoop().inEventLoop();
+            if (len == 0) {
+                promise.setSuccess();
+                return true;
+            }
+
+            try {
+                FileDescriptor[] pipe = pipe();
+                FileDescriptor pipeIn = pipe[0];
+                FileDescriptor pipeOut = pipe[1];
+                try {
+                    int splicedIn = spliceIn(pipeOut, handle);
+                    if (splicedIn > 0) {
+                        // Integer.MAX_VALUE is a special value which will result in splice forever.
+                        if (len != Integer.MAX_VALUE) {
+                            len -= splicedIn;
+                        }
+                        do {
+                            int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
+                            splicedIn -= splicedOut;
+                        } while (splicedIn > 0);
+                        if (len == 0) {
+                            promise.setSuccess();
+                            return true;
+                        }
+                    }
+                    return false;
+                } finally {
+                    safeClosePipe(pipeIn);
+                    safeClosePipe(pipeOut);
+                }
+            } catch (Throwable cause) {
+                promise.setFailure(cause);
+                return true;
+            }
+        }
+    }
+
+    private final class RawSocketSocketWritableByteChannel extends SocketWritableByteChannel {
+        RawSocketSocketWritableByteChannel() {
+            super(socket);
+        }
+
+        @Override
+        protected ByteBufAllocator alloc() {
+            return AbstractRawSocketStreamChannel.this.alloc();
+        }
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
index 8ac21dd..6f4b76a 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
@@ -1,186 +1,118 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
 package org.apache.plc4x.java.utils.rawsockets.netty;
 
-import com.savarese.rocksaw.net.RawSocket;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.nio.AbstractNioByteChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.internal.logging.InternalLogger;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectableChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
 
-/**
- * Netty channel implementation that uses RockSaw to create a raw socket connection to implement
- * IP-socket based protocols not based on TCP or UDP.
- *
- * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
- */
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannel extends AbstractRawSocketStreamChannel implements SocketChannel {
 
-    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
+    private final RawSocketChannelConfig config;
 
-    // The protocol number is defined in the IP protocol and indicates the type of protocol the payload
-    // the IP packet uses. This number is assigned by the IESG. A full list of the registered protocol
-    // numbers can be found here: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
-    private int protocolNumber;
+    private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
 
-    private RawSocket socket;
-    private InetSocketAddress localAddress;
-    private InetSocketAddress remoteAddress;
-
-    /**
-     * Initializes a raw socket that is able to communicate with raw IPv4 and IPv6 sockets, hereby
-     * allowing to implement protocols below TCP and UDP.
-     *
-     * For a list of public known protocol numbers see:
-     * https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
-     *
-     * @param parent
-     * @param ch
-     * @param protocolNumber protocol number identifying the protocol.
-     * @throws PlcIoException
-     */
-    public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
-        super(parent, ch);
-
-        this.protocolNumber = protocolNumber;
-
-        try {
-            socket = new RawSocket();
-            socket.setIPHeaderInclude(true);
-        } catch (IOException e) {
-            throw new PlcIoException("Error setting up raw socket", e);
-        }
+    public RawSocketChannel() {
+        super(newSocketStream(), false);
+        config = new RawSocketChannelConfig(this);
     }
 
-    /**
-     * Opens a connection to the given remote address.
-     *
-     * @param remoteAddress
-     * @param localAddress
-     * @return
-     * @throws Exception
-     */
-    @Override
-    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
-        if(!(remoteAddress instanceof InetSocketAddress) || !(localAddress instanceof InetSocketAddress)) {
-            throw new PlcIoException("Both remoteAddress and localAddress must be of type InetSocketAddress");
-        }
-
-        try {
-            this.localAddress = (InetSocketAddress) localAddress;
-            this.remoteAddress = (InetSocketAddress) remoteAddress;
-
-            socket.open(RawSocket.PF_INET, protocolNumber);
+    public RawSocketChannel(int fd) {
+        super(fd);
+        config = new RawSocketChannelConfig(this);
+    }
 
-            return socket.isOpen();
-        } catch (IllegalStateException | IOException e) {
-            return false;
-        }
+    RawSocketChannel(LinuxSocket fd, boolean active) {
+        super(fd, active);
+        config = new RawSocketChannelConfig(this);
     }
 
-    @Override
-    protected void doFinishConnect() throws Exception {
+    RawSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
+        super(parent, fd, remoteAddress);
+        config = new RawSocketChannelConfig(this);
 
+        if (parent instanceof RawSocketChannel) {
+            tcpMd5SigAddresses = ((RawSocketChannel) parent).tcpMd5SigAddresses();
+        }
     }
 
     /**
-     * Opens a listening socket.
-     *
-     * @param localAddress
-     * @throws Exception
+     * Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
      */
-    @Override
-    protected void doBind(SocketAddress localAddress) throws Exception {
-        if(socket.isOpen()) {
-            throw new PlcIoException("Raw socket already opened.");
-        }
-        if(localAddress instanceof InetSocketAddress) {
-            this.localAddress = (InetSocketAddress) localAddress;
-            socket.bind(this.localAddress.getAddress());
-        } else {
-            throw new PlcIoException("Unsupported type of local address. Only InetSocketAddress supported.");
-        }
+    public RawSocketTcpInfo tcpInfo() {
+        return tcpInfo(new RawSocketTcpInfo());
     }
 
     /**
-     * Closes the connection.
-     *
-     * @throws Exception
+     * Updates and returns the {@code TCP_INFO} for the current socket.
+     * See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
      */
-    @Override
-    protected void doDisconnect() throws Exception {
-        if(socket.isOpen()) {
-            socket.close();
+    public RawSocketTcpInfo tcpInfo(RawSocketTcpInfo info) {
+        try {
+            socket.getTcpInfo(info);
+            return info;
+        } catch (IOException e) {
+            throw new ChannelException(e);
         }
     }
 
     @Override
-    protected ChannelFuture shutdownInput() {
-        return null;
-    }
-
-    @Override
-    protected int doReadBytes(ByteBuf buf) throws Exception {
-        byte[] byteBuf = new byte[1024];
-        int readBytes = socket.read(byteBuf);
-        buf.writeBytes(byteBuf, 0, readBytes);
-        return readBytes;
+    public InetSocketAddress remoteAddress() {
+        return (InetSocketAddress) super.remoteAddress();
     }
 
     @Override
-    protected int doWriteBytes(ByteBuf buf) throws Exception {
-        byte[] readableBytes = new byte[buf.readableBytes()];
-        buf.readBytes(readableBytes);
-        socket.write(remoteAddress.getAddress(), readableBytes);
-        return readableBytes.length;
+    public InetSocketAddress localAddress() {
+        return (InetSocketAddress) super.localAddress();
     }
 
     @Override
-    protected long doWriteFileRegion(FileRegion region) throws Exception {
-        throw new UnsupportedOperationException("doWriteFileRegion not implemented");
+    public RawSocketChannelConfig config() {
+        return config;
     }
 
     @Override
-    protected SocketAddress localAddress0() {
-        return localAddress;
+    public ServerSocketChannel parent() {
+        return (ServerSocketChannel) super.parent();
     }
 
     @Override
-    protected SocketAddress remoteAddress0() {
-        return remoteAddress;
+    protected AbstractEpollUnsafe newUnsafe() {
+        return new EpollSocketChannelUnsafe();
     }
 
-    @Override
-    public ChannelConfig config() {
-        return null;
+    private final class EpollSocketChannelUnsafe extends RawSocketStreamUnsafe {
+        @Override
+        protected Executor prepareToClose() {
+            try {
+                // Check isOpen() first as otherwise it will throw a RuntimeException
+                // when call getSoLinger() as the fd is not valid anymore.
+                if (isOpen() && config().getSoLinger() > 0) {
+                    // We need to cancel this key of the channel so we may not end up in a eventloop spin
+                    // because we try to read or write until the actual close happens which may be later due
+                    // SO_LINGER handling.
+                    // See https://github.com/netty/netty/issues/4449
+                    ((RawSocketEventLoop) eventLoop()).remove(RawSocketChannel.this);
+                    return GlobalEventExecutor.INSTANCE;
+                }
+            } catch (Throwable ignore) {
+                // Ignore the error as the underlying channel may be closed in the meantime and so
+                // getSoLinger() may produce an exception. In this case we just return null.
+                // See https://github.com/netty/netty/issues/4449
+            }
+            return null;
+        }
     }
 
-    @Override
-    public boolean isActive() {
-        return socket.isOpen();
+    void setTcpMd5Sig(Map<InetAddress, byte[]> keys) throws IOException {
+        tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
     }
 }
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
new file mode 100644
index 0000000..8f77972
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static io.netty.channel.unix.Limits.SSIZE_MAX;
+
+public class RawSocketChannelConfig extends DefaultChannelConfig {
+    final AbstractRawSocketChannel channel;
+    private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
+
+    RawSocketChannelConfig(AbstractRawSocketChannel channel) {
+        super(channel);
+        this.channel = channel;
+    }
+
+    @Override
+    public Map<ChannelOption<?>, Object> getOptions() {
+        return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getOption(ChannelOption<T> option) {
+        if (option == EpollChannelOption.EPOLL_MODE) {
+            return (T) getEpollMode();
+        }
+        return super.getOption(option);
+    }
+
+    @Override
+    public <T> boolean setOption(ChannelOption<T> option, T value) {
+        validate(option, value);
+        if (option == EpollChannelOption.EPOLL_MODE) {
+            setEpollMode((EpollMode) value);
+        } else {
+            return super.setOption(option, value);
+        }
+        return true;
+    }
+
+    @Override
+    public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
+        super.setConnectTimeoutMillis(connectTimeoutMillis);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
+        super.setMaxMessagesPerRead(maxMessagesPerRead);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
+        super.setWriteSpinCount(writeSpinCount);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
+        super.setAllocator(allocator);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
+        if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
+            throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
+                    RecvByteBufAllocator.ExtendedHandle.class);
+        }
+        super.setRecvByteBufAllocator(allocator);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setAutoRead(boolean autoRead) {
+        super.setAutoRead(autoRead);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
+        super.setWriteBufferWaterMark(writeBufferWaterMark);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
+        super.setMessageSizeEstimator(estimator);
+        return this;
+    }
+
+    /**
+     * Return the {@link EpollMode} used. Default is
+     * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+     * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+     * {@link EpollMode#LEVEL_TRIGGERED}.
+     */
+    public EpollMode getEpollMode() {
+        return channel.isFlagSet(Native.EPOLLET)
+                ? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
+    }
+
+    /**
+     * Set the {@link EpollMode} used. Default is
+     * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+     * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+     * {@link EpollMode#LEVEL_TRIGGERED}.
+     *
+     * <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
+     */
+    public EpollChannelConfig setEpollMode(EpollMode mode) {
+        if (mode == null) {
+            throw new NullPointerException("mode");
+        }
+        try {
+            switch (mode) {
+            case EDGE_TRIGGERED:
+                checkChannelNotRegistered();
+                channel.setFlag(Native.EPOLLET);
+                break;
+            case LEVEL_TRIGGERED:
+                checkChannelNotRegistered();
+                channel.clearFlag(Native.EPOLLET);
+                break;
+            default:
+                throw new Error();
+            }
+        } catch (IOException e) {
+            throw new ChannelException(e);
+        }
+        return this;
+    }
+
+    private void checkChannelNotRegistered() {
+        if (channel.isRegistered()) {
+            throw new IllegalStateException("EpollMode can only be changed before channel is registered");
+        }
+    }
+
+    @Override
+    protected final void autoReadCleared() {
+        channel.clearEpollIn();
+    }
+
+    final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
+        this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
+    }
+
+    final long getMaxBytesPerGatheringWrite() {
+        return maxBytesPerGatheringWrite;
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
similarity index 97%
copy from plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
copy to plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
index 8ac21dd..cba1ad8 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
@@ -38,7 +38,7 @@ import java.nio.channels.SelectableChannel;
  *
  * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
  */
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannelSav extends AbstractNioByteChannel {
 
     private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
 
@@ -63,7 +63,7 @@ public class RawSocketChannel extends AbstractNioByteChannel {
      * @param protocolNumber protocol number identifying the protocol.
      * @throws PlcIoException
      */
-    public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
+    public RawSocketChannelSav(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
         super(parent, ch);
 
         this.protocolNumber = protocolNumber;
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
new file mode 100644
index 0000000..03e024d
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead.
+ *
+ * typedef union epoll_data {
+ *     void        *ptr;
+ *     int          fd;
+ *     uint32_t     u32;
+ *     uint64_t     u64;
+ * } epoll_data_t;
+ *
+ * struct epoll_event {
+ *     uint32_t     events;      // Epoll events
+ *     epoll_data_t data;        // User data variable
+ * };
+ *
+ * We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an
+ * {@link AbstractRawSocketChannel} and so be able to map it later.
+ */
+final class RawSocketEventArray {
+    // Size of the epoll_event struct
+    private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent();
+    // The offsiet of the data union in the epoll_event struct
+    private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData();
+
+    private long memoryAddress;
+    private int length;
+
+    RawSocketEventArray(int length) {
+        if (length < 1) {
+            throw new IllegalArgumentException("length must be >= 1 but was " + length);
+        }
+        this.length = length;
+        memoryAddress = allocate(length);
+    }
+
+    private static long allocate(int length) {
+        return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE);
+    }
+
+    /**
+     * Return the {@code memoryAddress} which points to the start of this {@link RawSocketEventArray}.
+     */
+    long memoryAddress() {
+        return memoryAddress;
+    }
+
+    /**
+     * Return the length of the {@link RawSocketEventArray} which represent the maximum number of {@code epoll_events}
+     * that can be stored in it.
+     */
+    int length() {
+        return length;
+    }
+
+    /**
+     * Increase the storage of this {@link RawSocketEventArray}.
+     */
+    void increase() {
+        // double the size
+        length <<= 1;
+        free();
+        memoryAddress = allocate(length);
+    }
+
+    /**
+     * Free this {@link RawSocketEventArray}. Any usage after calling this method may segfault the JVM!
+     */
+    void free() {
+        PlatformDependent.freeMemory(memoryAddress);
+    }
+
+    /**
+     * Return the events for the {@code epoll_event} on this index.
+     */
+    int events(int index) {
+        return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE);
+    }
+
+    /**
+     * Return the file descriptor for the {@code epoll_event} on this index.
+     */
+    int fd(int index) {
+        return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET);
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
new file mode 100644
index 0000000..ac13e50
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SelectStrategy;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.util.IntSupplier;
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.IntObjectMap;
+import io.netty.util.concurrent.RejectedExecutionHandler;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static java.lang.Math.min;
+
+/**
+ * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
+ */
+final class RawSocketEventLoop extends SingleThreadEventLoop {
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RawSocketEventLoop.class);
+    private static final AtomicIntegerFieldUpdater<RawSocketEventLoop> WAKEN_UP_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(RawSocketEventLoop.class, "wakenUp");
+
+    static {
+        // Ensure JNI is initialized by the time this class is loaded by this time!
+        // We use unix-common methods in this class which are backed by JNI methods.
+        Epoll.ensureAvailability();
+    }
+
+    private final FileDescriptor epollFd;
+    private final FileDescriptor eventFd;
+    private final FileDescriptor timerFd;
+    private final IntObjectMap<AbstractRawSocketChannel> channels = new IntObjectHashMap<AbstractRawSocketChannel>(4096);
+    private final boolean allowGrowing;
+    private final RawSocketEventArray events;
+    private final IovArray iovArray = new IovArray();
+    private final SelectStrategy selectStrategy;
+    private final IntSupplier selectNowSupplier = new IntSupplier() {
+        @Override
+        public int get() throws Exception {
+            return epollWaitNow();
+        }
+    };
+    private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+            return RawSocketEventLoop.super.pendingTasks();
+        }
+    };
+    private volatile int wakenUp;
+    private volatile int ioRatio = 50;
+
+    RawSocketEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
+                       SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
+        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
+        selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
+        if (maxEvents == 0) {
+            allowGrowing = true;
+            events = new RawSocketEventArray(4096);
+        } else {
+            allowGrowing = false;
+            events = new RawSocketEventArray(maxEvents);
+        }
+        boolean success = false;
+        FileDescriptor epollFd = null;
+        FileDescriptor eventFd = null;
+        FileDescriptor timerFd = null;
+        try {
+            this.epollFd = epollFd = Native.newEpollCreate();
+            this.eventFd = eventFd = Native.newEventFd();
+            try {
+                Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
+            }
+            this.timerFd = timerFd = Native.newTimerFd();
+            try {
+                Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
+            }
+            success = true;
+        } finally {
+            if (!success) {
+                if (epollFd != null) {
+                    try {
+                        epollFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+                if (eventFd != null) {
+                    try {
+                        eventFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+                if (timerFd != null) {
+                    try {
+                        timerFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
+     */
+    IovArray cleanArray() {
+        iovArray.clear();
+        return iovArray;
+    }
+
+    @Override
+    protected void wakeup(boolean inEventLoop) {
+        if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
+            // write to the evfd which will then wake-up epoll_wait(...)
+            Native.eventFdWrite(eventFd.intValue(), 1L);
+        }
+    }
+
+    /**
+     * Register the given epoll with this {@link EventLoop}.
+     */
+    void add(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+        int fd = ch.socket.intValue();
+        Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
+        channels.put(fd, ch);
+    }
+
+    /**
+     * The flags of the given epoll was modified so update the registration
+     */
+    void modify(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+        Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
+    }
+
+    /**
+     * Deregister the given epoll from this {@link EventLoop}.
+     */
+    void remove(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+
+        if (ch.isOpen()) {
+            int fd = ch.socket.intValue();
+            if (channels.remove(fd) != null) {
+                // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
+                // removed once the file-descriptor is closed.
+                Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
+            }
+        }
+    }
+
+    @Override
+    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
+        // This event loop never calls takeTask()
+        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
+                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
+    }
+
+    @Override
+    public int pendingTasks() {
+        // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
+        // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
+        // See https://github.com/netty/netty/issues/5297
+        if (inEventLoop()) {
+            return super.pendingTasks();
+        } else {
+            return submit(pendingTasksCallable).syncUninterruptibly().getNow();
+        }
+    }
+    /**
+     * Returns the percentage of the desired amount of time spent for I/O in the event loop.
+     */
+    public int getIoRatio() {
+        return ioRatio;
+    }
+
+    /**
+     * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
+     * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
+     */
+    public void setIoRatio(int ioRatio) {
+        if (ioRatio <= 0 || ioRatio > 100) {
+            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
+        }
+        this.ioRatio = ioRatio;
+    }
+
+    private int epollWait(boolean oldWakeup) throws IOException {
+        // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
+        // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
+        // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
+        // in pipeline.
+        if (oldWakeup && hasTasks()) {
+            return epollWaitNow();
+        }
+
+        long totalDelay = delayNanos(System.nanoTime());
+        int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
+        return Native.epollWait(epollFd, events, timerFd, delaySeconds,
+                (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
+    }
+
+    private int epollWaitNow() throws IOException {
+        return Native.epollWait(epollFd, events, timerFd, 0, 0);
+    }
+
+    @Override
+    protected void run() {
+        for (;;) {
+            try {
+                int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
+                switch (strategy) {
+                    case SelectStrategy.CONTINUE:
+                        continue;
+                    case SelectStrategy.SELECT:
+                        strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
+
+                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
+                        // before calling 'selector.wakeup()' to reduce the wake-up
+                        // overhead. (Selector.wakeup() is an expensive operation.)
+                        //
+                        // However, there is a race condition in this approach.
+                        // The race condition is triggered when 'wakenUp' is set to
+                        // true too early.
+                        //
+                        // 'wakenUp' is set to true too early if:
+                        // 1) Selector is waken up between 'wakenUp.set(false)' and
+                        //    'selector.select(...)'. (BAD)
+                        // 2) Selector is waken up between 'selector.select(...)' and
+                        //    'if (wakenUp.get()) { ... }'. (OK)
+                        //
+                        // In the first case, 'wakenUp' is set to true and the
+                        // following 'selector.select(...)' will wake up immediately.
+                        // Until 'wakenUp' is set to false again in the next round,
+                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
+                        // any attempt to wake up the Selector will fail, too, causing
+                        // the following 'selector.select(...)' call to block
+                        // unnecessarily.
+                        //
+                        // To fix this problem, we wake up the selector again if wakenUp
+                        // is true immediately after selector.select(...).
+                        // It is inefficient in that it wakes up the selector for both
+                        // the first case (BAD - wake-up required) and the second case
+                        // (OK - no wake-up required).
+
+                        if (wakenUp == 1) {
+                            Native.eventFdWrite(eventFd.intValue(), 1L);
+                        }
+                        // fallthrough
+                    default:
+                }
+
+                final int ioRatio = this.ioRatio;
+                if (ioRatio == 100) {
+                    try {
+                        if (strategy > 0) {
+                            processReady(events, strategy);
+                        }
+                    } finally {
+                        // Ensure we always run tasks.
+                        runAllTasks();
+                    }
+                } else {
+                    final long ioStartTime = System.nanoTime();
+
+                    try {
+                        if (strategy > 0) {
+                            processReady(events, strategy);
+                        }
+                    } finally {
+                        // Ensure we always run tasks.
+                        final long ioTime = System.nanoTime() - ioStartTime;
+                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
+                    }
+                }
+                if (allowGrowing && strategy == events.length()) {
+                    //increase the size of the array as we needed the whole space for the events
+                    events.increase();
+                }
+            } catch (Throwable t) {
+                handleLoopException(t);
+            }
+            // Always handle shutdown even if the loop processing threw an exception.
+            try {
+                if (isShuttingDown()) {
+                    closeAll();
+                    if (confirmShutdown()) {
+                        break;
+                    }
+                }
+            } catch (Throwable t) {
+                handleLoopException(t);
+            }
+        }
+    }
+
+    private static void handleLoopException(Throwable t) {
+        logger.warn("Unexpected exception in the selector loop.", t);
+
+        // Prevent possible consecutive immediate failures that lead to
+        // excessive CPU consumption.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            // Ignore.
+        }
+    }
+
+    private void closeAll() {
+        try {
+            epollWaitNow();
+        } catch (IOException ignore) {
+            // ignore on close
+        }
+        // Using the intermediate collection to prevent ConcurrentModificationException.
+        // In the `close()` method, the channel is deleted from `channels` map.
+        Collection<AbstractRawSocketChannel> array = new ArrayList<AbstractRawSocketChannel>(channels.size());
+
+        for (AbstractRawSocketChannel channel: channels.values()) {
+            array.add(channel);
+        }
+
+        for (AbstractRawSocketChannel ch: array) {
+            ch.unsafe().close(ch.unsafe().voidPromise());
+        }
+    }
+
+    private void processReady(RawSocketEventArray events, int ready) {
+        for (int i = 0; i < ready; i ++) {
+            final int fd = events.fd(i);
+            if (fd == eventFd.intValue()) {
+                // consume wakeup event.
+                Native.eventFdRead(fd);
+            } else if (fd == timerFd.intValue()) {
+                // consume wakeup event, necessary because the timer is added with ET mode.
+                Native.timerFdRead(fd);
+            } else {
+                final long ev = events.events(i);
+
+                AbstractRawSocketChannel ch = channels.get(fd);
+                if (ch != null) {
+                    // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
+                    // sure about it!
+                    // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
+                    // past.
+                    AbstractRawSocketUnsafe unsafe = (AbstractRawSocketUnsafe) ch.unsafe();
+
+                    // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
+                    // to read from the file descriptor.
+                    // See https://github.com/netty/netty/issues/3785
+                    //
+                    // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
+                    // In either case epollOutReady() will do the correct thing (finish connecting, or fail
+                    // the connection).
+                    // See https://github.com/netty/netty/issues/3848
+                    if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
+                        // Force flush of data as the epoll is writable again
+                        unsafe.epollOutReady();
+                    }
+
+                    // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
+                    // See https://github.com/netty/netty/issues/4317.
+                    //
+                    // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
+                    // try to read from the underlying file descriptor and so notify the user about the error.
+                    if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
+                        // The Channel is still open and there is something to read. Do it now.
+                        unsafe.epollInReady();
+                    }
+
+                    // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
+                    // we may close the channel directly or try to read more data depending on the state of the
+                    // Channel and als depending on the AbstractEpollChannel subtype.
+                    if ((ev & Native.EPOLLRDHUP) != 0) {
+                        unsafe.epollRdHupReady();
+                    }
+                } else {
+                    // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
+                    try {
+                        Native.epollCtlDel(epollFd.intValue(), fd);
+                    } catch (IOException ignore) {
+                        // This can happen but is nothing we need to worry about as we only try to delete
+                        // the fd from the epoll set as we not found it in our mappings. So this call to
+                        // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
+                        // deleted before or the file descriptor was closed before.
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup() {
+        try {
+            try {
+                epollFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the epoll fd.", e);
+            }
+            try {
+                eventFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the event fd.", e);
+            }
+            try {
+                timerFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the timer fd.", e);
+            }
+        } finally {
+            // release native memory
+            iovArray.release();
+            events.free();
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.