You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/01/09 15:09:20 UTC

[incubator-plc4x] branch feature/PLC4X-18--raw-sockets updated (386c49d -> 9267533)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 386c49d  PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP
     add 2d1b219  stop RTE when passing empty or null messages
     add ce7f0c8  stop RTE when passing empty or null messages
     add 1e49bb5  stop RTE when passing empty or null messages
     add 9942cc3  sonar project properties
     add 70b1a8e  add .scannerwork
     add f49a686  ignore test and mock classes
     add 0bc9f43   private final static is the preferred order
     add 6b64873  fix version and add tests and coverage
     add fe4b740  add api coverage
     new a980d61  Merge branches 'feature/PLC4X-18--raw-sockets' and 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets
     new 03ba3ce  PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP
     new cbcd7e4  PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP  - Initial commit of this effort - Added a new module hierarchy for utility modules
     add ce7eaee  fix build by adding missing apache headers
     add 8ebeeef  PLC4X-21 - Code coverage doesn't seem to work
     add b3d4564  Test extension: - added Tests for Plc4XS7Protocol - added basic test implementation for S7Protocol
     add ef6084a  no need for abstract method as it's in base class and class is abstract
     add 641c99f  refactor and clean up switches
     add 63bba4d  clean up switch statements a little
     add f9d3fa1  no need for local variable
     add 02b8f57  unnneeded import
     add c4c9ac2  statics come first
     add 8917742  prefer parseInt to valueOf
     add c0c3bb1  local var had some name as field
     add 4166952  local var had some name as field
     add 4612ae3  if you don't return null to here's no need to check for it
     add 4619484  exception can't be throw
     add 125e630  no need for nested ifs
     add 708f563  Merge remote-tracking branch 'origin/master'
     add 4eadf74  PLC4X-21 - Code coverage doesn't seem to work
     add 08fc594  PLC4X-21 - Code coverage doesn't seem to work
     add d0adfde  PLC4X-21 - Code coverage doesn't seem to work
     add 0a22610  PLC4X-22: temporary disable camel BOM due to long build-time on site:site
     add ff882df  fix up switch and endless loops
     add 04245aa  refactor to reduce complexity and fix endless loops
     add 4383c38  log exception
     add 1e853eb  add @override
     add 79f031d  unneeded brackets
     add 7df5be8  SONAR Make the programs terminate (getting rid of the two blocker issues)
     add 464d125  SONAR Some further fixing of issues sonarqube reported
     add 74617fc  mask slot number
     add ecba61f  add calling parameter test
     add 7b849a1  get rid of a level of nesting
     add 5ad5d2d  no real need to calculate size/length every time through the loop
     add bd85953  add more encode and decode parameter tests
     add 44e01ea  Sort divided by a int is always an int no need to round up with Math.ceil
     add 25a752c  Enable coverage checks (Report warnings but don't break the build for now)
     new 9267533  Merge branch 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |    1 +
 Jenkinsfile                                        |    4 +-
 applications/iotree/pom.xml                        |    5 +
 .../plc4x/java/applications/iotree/IoTree.java     |   20 +-
 applications/plclogger/pom.xml                     |    5 +
 .../java/applications/plclogger/PlcLogger.java     |   20 +-
 integrations/apache-camel/pom.xml                  |   23 +-
 .../java/org/apache/plc4x/camel/PLC4XProducer.java |    2 +
 .../apache/plc4x/edgent/PlcConnectionAdapter.java  |   10 +-
 .../java/api/connection/AbstractPlcConnection.java |    4 -
 .../PlcUsernamePasswordAuthenticationTest.java     |    0
 .../plc4x/java/api/messages/APIMessageTests.java   |    0
 .../plc4x/java/api/messages/mock/MockAddress.java  |    0
 plc4j/pom.xml                                      |   40 +-
 .../plc4x/java/isotp/netty/IsoTPProtocol.java      |  383 +++----
 .../java/isotp/netty/model/types/DeviceGroup.java  |    4 +-
 .../isotp/netty/model/types/DisconnectReason.java  |    4 +-
 .../isotp/netty/model/types/ParameterCode.java     |    4 +-
 .../isotp/netty/model/types/ProtocolClass.java     |    4 +-
 .../java/isotp/netty/model/types/RejectCause.java  |    4 +-
 .../java/isotp/netty/model/types/TpduCode.java     |    4 +-
 .../java/isotp/netty/model/types/TpduSize.java     |    4 +-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |    8 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |   75 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java |  233 +++--
 .../netty/model/types/DataTransportErrorCode.java  |    4 +-
 .../s7/netty/model/types/DataTransportSize.java    |    4 +-
 .../java/s7/netty/model/types/MemoryArea.java      |    4 +-
 .../java/s7/netty/model/types/MessageType.java     |    4 +-
 .../java/s7/netty/model/types/ParameterType.java   |    4 +-
 .../s7/netty/model/types/SpecificationType.java    |    4 +-
 .../java/s7/netty/model/types/TransportSize.java   |    4 +-
 .../netty/model/types/VariableAddressingMode.java  |    4 +-
 .../plc4x/java/isotp/netty/IsoTPProtocolTest.java  |  324 +++++-
 .../apache/plc4x/java/isotp/netty/MockChannel.java |   18 +
 .../isotp/netty/MockChannelHandlerContext.java     |   18 +
 .../java/isotp/netty/MockChannelPipeline.java      |   18 +
 .../apache/plc4x/java/s7/S7PlcReaderSample.java    |    4 +-
 .../org/apache/plc4x/java/s7/S7PlcScanner.java     |    2 +-
 .../org/apache/plc4x/java/s7/S7PlcTestConsole.java |    2 +-
 .../apache/plc4x/java/s7/S7PlcWriterSample.java    |    2 +-
 .../plc4x/java/s7/netty/Plc4XS7ProtocolTest.java   |  205 +++-
 .../apache/plc4x/java/s7/netty/S7ProtocolTest.java |   62 +-
 .../rawsockets/netty/AbstractRawSocketChannel.java |  740 ++++++++++++++
 .../netty/AbstractRawSocketStreamChannel.java      | 1042 ++++++++++++++++++++
 .../utils/rawsockets/netty/RawSocketChannel.java   |  214 ++--
 .../rawsockets/netty/RawSocketChannelConfig.java   |  187 ++++
 ...SocketChannel.java => RawSocketChannelSav.java} |    4 +-
 .../rawsockets/netty/RawSocketEventArray.java      |  104 ++
 .../utils/rawsockets/netty/RawSocketEventLoop.java |  449 +++++++++
 plc4j/{api => utils/wireshark-utils}/pom.xml       |    8 +-
 pom.xml                                            |  263 +++--
 sonar-project.properties.sav                       |   46 +
 53 files changed, 3920 insertions(+), 685 deletions(-)
 rename plc4j/api/src/test/{ => java}/org/apache/plc4x/java/api/authentication/PlcUsernamePasswordAuthenticationTest.java (100%)
 rename plc4j/api/src/test/{ => java}/org/apache/plc4x/java/api/messages/APIMessageTests.java (100%)
 rename plc4j/api/src/test/{ => java}/org/apache/plc4x/java/api/messages/mock/MockAddress.java (100%)
 create mode 100644 plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
 create mode 100644 plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
 create mode 100644 plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
 copy plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/{RawSocketChannel.java => RawSocketChannelSav.java} (97%)
 create mode 100644 plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
 create mode 100644 plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
 copy plc4j/{api => utils/wireshark-utils}/pom.xml (80%)
 create mode 100644 sonar-project.properties.sav

-- 
To stop receiving notification emails like this one, please contact
['"commits@plc4x.apache.org" <co...@plc4x.apache.org>'].

[incubator-plc4x] 03/04: PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP - Initial commit of this effort - Added a new module hierarchy for utility modules

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit cbcd7e4e8562fbd99d10b11e98bc59339d1fd15a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Jan 5 09:18:15 2018 +0100

    PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP  - Initial commit of this effort - Added a new module hierarchy for utility modules
---
 plc4j/pom.xml                                      |    2 +-
 .../rawsockets/netty/AbstractRawSocketChannel.java |  740 ++++++++++++++
 .../netty/AbstractRawSocketStreamChannel.java      | 1042 ++++++++++++++++++++
 .../utils/rawsockets/netty/RawSocketChannel.java   |  214 ++--
 .../rawsockets/netty/RawSocketChannelConfig.java   |  187 ++++
 ...SocketChannel.java => RawSocketChannelSav.java} |    4 +-
 .../rawsockets/netty/RawSocketEventArray.java      |  104 ++
 .../utils/rawsockets/netty/RawSocketEventLoop.java |  449 +++++++++
 8 files changed, 2598 insertions(+), 144 deletions(-)

diff --git a/plc4j/pom.xml b/plc4j/pom.xml
index e9932cb..62d0025 100644
--- a/plc4j/pom.xml
+++ b/plc4j/pom.xml
@@ -39,7 +39,7 @@
     <module>api</module>
     <module>core</module>
     <module>protocols</module>
-    <module>utils</module>
+    <!--module>utils</module-->
   </modules>
 
   <dependencies>
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
new file mode 100644
index 0000000..a8762ad
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
@@ -0,0 +1,740 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.channel.socket.ChannelInputShutdownReadComplete;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.Socket;
+import io.netty.channel.unix.UnixChannel;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ThrowableUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+abstract class AbstractRawSocketChannel extends AbstractChannel implements Channel {
+    private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new ClosedChannelException(), AbstractRawSocketChannel.class, "doClose()");
+    private static final ChannelMetadata METADATA = new ChannelMetadata(false);
+    private final int readFlag;
+    final LinuxSocket socket;
+    /**
+     * The future of the current connection attempt.  If not null, subsequent
+     * connection attempts will fail.
+     */
+    private ChannelPromise connectPromise;
+    private ScheduledFuture<?> connectTimeoutFuture;
+    private SocketAddress requestedRemoteAddress;
+
+    private volatile SocketAddress local;
+    private volatile SocketAddress remote;
+
+    protected int flags = Native.EPOLLET;
+    boolean inputClosedSeenErrorOnRead;
+    boolean epollInReadyRunnablePending;
+
+    protected volatile boolean active;
+
+    AbstractRawSocketChannel(LinuxSocket fd, int flag) {
+        this(null, fd, flag, false);
+    }
+
+    AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, boolean active) {
+        super(parent);
+        socket = checkNotNull(fd, "fd");
+        readFlag = flag;
+        flags |= flag;
+        this.active = active;
+        if (active) {
+            // Directly cache the remote and local addresses
+            // See https://github.com/netty/netty/issues/2359
+            local = fd.localAddress();
+            remote = fd.remoteAddress();
+        }
+    }
+
+    AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) {
+        super(parent);
+        socket = checkNotNull(fd, "fd");
+        readFlag = flag;
+        flags |= flag;
+        active = true;
+        // Directly cache the remote and local addresses
+        // See https://github.com/netty/netty/issues/2359
+        this.remote = remote;
+        local = fd.localAddress();
+    }
+
+    static boolean isSoErrorZero(Socket fd) {
+        try {
+            return fd.getSoError() == 0;
+        } catch (IOException e) {
+            throw new ChannelException(e);
+        }
+    }
+
+    void setFlag(int flag) throws IOException {
+        if (!isFlagSet(flag)) {
+            flags |= flag;
+            modifyEvents();
+        }
+    }
+
+    void clearFlag(int flag) throws IOException {
+        if (isFlagSet(flag)) {
+            flags &= ~flag;
+            modifyEvents();
+        }
+    }
+
+    boolean isFlagSet(int flag) {
+        return (flags & flag) != 0;
+    }
+
+    @Override
+    public final FileDescriptor fd() {
+        return socket;
+    }
+
+    @Override
+    public abstract RawSocketChannelConfig config();
+
+    @Override
+    public boolean isActive() {
+        return active;
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return METADATA;
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        active = false;
+        // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
+        // socket which has not even been connected yet. This has been observed to block during unit tests.
+        inputClosedSeenErrorOnRead = true;
+        try {
+            ChannelPromise promise = connectPromise;
+            if (promise != null) {
+                // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+                promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
+                connectPromise = null;
+            }
+
+            ScheduledFuture<?> future = connectTimeoutFuture;
+            if (future != null) {
+                future.cancel(false);
+                connectTimeoutFuture = null;
+            }
+
+            if (isRegistered()) {
+                // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
+                // if SO_LINGER is used.
+                //
+                // See https://github.com/netty/netty/issues/7159
+                EventLoop loop = eventLoop();
+                if (loop.inEventLoop()) {
+                    doDeregister();
+                } else {
+                    loop.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                doDeregister();
+                            } catch (Throwable cause) {
+                                pipeline().fireExceptionCaught(cause);
+                            }
+                        }
+                    });
+                }
+            }
+        } finally {
+            socket.close();
+        }
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+        doClose();
+    }
+
+    @Override
+    protected boolean isCompatible(EventLoop loop) {
+        return loop instanceof RawSocketEventLoop;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socket.isOpen();
+    }
+
+    @Override
+    protected void doDeregister() throws Exception {
+        ((RawSocketEventLoop) eventLoop()).remove(this);
+    }
+
+    @Override
+    protected final void doBeginRead() throws Exception {
+        // Channel.read() or ChannelHandlerContext.read() was called
+        final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+        unsafe.readPending = true;
+
+        // We must set the read flag here as it is possible the user didn't read in the last read loop, the
+        // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
+        // never get data after this.
+        setFlag(readFlag);
+
+        // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
+        // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
+        if (unsafe.maybeMoreDataToRead) {
+            unsafe.executeEpollInReadyRunnable(config());
+        }
+    }
+
+    final boolean shouldBreakEpollInReady(ChannelConfig config) {
+        return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
+    }
+
+    private static boolean isAllowHalfClosure(ChannelConfig config) {
+        return config instanceof RawSocketSocketChannelConfig &&
+                ((RawSocketSocketChannelConfig) config).isAllowHalfClosure();
+    }
+
+    final void clearEpollIn() {
+        // Only clear if registered with an EventLoop as otherwise
+        if (isRegistered()) {
+            final EventLoop loop = eventLoop();
+            final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+            if (loop.inEventLoop()) {
+                unsafe.clearEpollIn0();
+            } else {
+                // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
+                loop.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        if (!unsafe.readPending && !config().isAutoRead()) {
+                            // Still no read triggered so clear it now
+                            unsafe.clearEpollIn0();
+                        }
+                    }
+                });
+            }
+        } else  {
+            // The EventLoop is not registered atm so just update the flags so the correct value
+            // will be used once the channel is registered
+            flags &= ~readFlag;
+        }
+    }
+
+    private void modifyEvents() throws IOException {
+        if (isOpen() && isRegistered()) {
+            ((RawSocketEventLoop) eventLoop()).modify(this);
+        }
+    }
+
+    @Override
+    protected void doRegister() throws Exception {
+        // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
+        // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
+        // new EventLoop.
+        epollInReadyRunnablePending = false;
+        ((RawSocketEventLoop) eventLoop()).add(this);
+    }
+
+    @Override
+    protected abstract AbstractEpollUnsafe newUnsafe();
+
+    /**
+     * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
+     */
+    protected final ByteBuf newDirectBuffer(ByteBuf buf) {
+        return newDirectBuffer(buf, buf);
+    }
+
+    /**
+     * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
+     * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
+     * this method.
+     */
+    protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
+        final int readableBytes = buf.readableBytes();
+        if (readableBytes == 0) {
+            ReferenceCountUtil.release(holder);
+            return Unpooled.EMPTY_BUFFER;
+        }
+
+        final ByteBufAllocator alloc = alloc();
+        if (alloc.isDirectBufferPooled()) {
+            return newDirectBuffer0(holder, buf, alloc, readableBytes);
+        }
+
+        final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
+        if (directBuf == null) {
+            return newDirectBuffer0(holder, buf, alloc, readableBytes);
+        }
+
+        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
+        ReferenceCountUtil.safeRelease(holder);
+        return directBuf;
+    }
+
+    private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
+        final ByteBuf directBuf = alloc.directBuffer(capacity);
+        directBuf.writeBytes(buf, buf.readerIndex(), capacity);
+        ReferenceCountUtil.safeRelease(holder);
+        return directBuf;
+    }
+
+    protected static void checkResolvable(InetSocketAddress addr) {
+        if (addr.isUnresolved()) {
+            throw new UnresolvedAddressException();
+        }
+    }
+
+    /**
+     * Read bytes into the given {@link ByteBuf} and return the amount.
+     */
+    protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
+        int writerIndex = byteBuf.writerIndex();
+        int localReadAmount;
+        unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
+        if (byteBuf.hasMemoryAddress()) {
+            localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
+        } else {
+            ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
+            localReadAmount = socket.read(buf, buf.position(), buf.limit());
+        }
+        if (localReadAmount > 0) {
+            byteBuf.writerIndex(writerIndex + localReadAmount);
+        }
+        return localReadAmount;
+    }
+
+    protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+        if (buf.hasMemoryAddress()) {
+            int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
+            if (localFlushedAmount > 0) {
+                in.removeBytes(localFlushedAmount);
+                return 1;
+            }
+        } else {
+            final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
+                    buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
+            int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
+            if (localFlushedAmount > 0) {
+                nioBuf.position(nioBuf.position() + localFlushedAmount);
+                in.removeBytes(localFlushedAmount);
+                return 1;
+            }
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
+        boolean readPending;
+        boolean maybeMoreDataToRead;
+        private RawSocketRecvByteAllocatorHandle allocHandle;
+        private final Runnable epollInReadyRunnable = new Runnable() {
+            @Override
+            public void run() {
+                epollInReadyRunnablePending = false;
+                epollInReady();
+            }
+        };
+
+        /**
+         * Called once EPOLLIN event is ready to be processed
+         */
+        abstract void epollInReady();
+
+        final void epollInBefore() { maybeMoreDataToRead = false; }
+
+        final void epollInFinally(ChannelConfig config) {
+            maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
+            // Check if there is a readPending which was not processed yet.
+            // This could be for two reasons:
+            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
+            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
+            //
+            // See https://github.com/netty/netty/issues/2254
+            if (!readPending && !config.isAutoRead()) {
+                clearEpollIn();
+            } else if (readPending && maybeMoreDataToRead) {
+                // trigger a read again as there may be something left to read and because of epoll ET we
+                // will not get notified again until we read everything from the socket
+                //
+                // It is possible the last fireChannelRead call could cause the user to call read() again, or if
+                // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
+                // to false before every read operation to prevent re-entry into epollInReady() we will not read from
+                // the underlying OS again unless the user happens to call read again.
+                executeEpollInReadyRunnable(config);
+            }
+        }
+
+        final void executeEpollInReadyRunnable(ChannelConfig config) {
+            if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
+                return;
+            }
+            epollInReadyRunnablePending = true;
+            eventLoop().execute(epollInReadyRunnable);
+        }
+
+        /**
+         * Called once EPOLLRDHUP event is ready to be processed
+         */
+        final void epollRdHupReady() {
+            // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
+            recvBufAllocHandle().receivedRdHup();
+
+            if (isActive()) {
+                // If it is still active, we need to call epollInReady as otherwise we may miss to
+                // read pending data from the underlying file descriptor.
+                // See https://github.com/netty/netty/issues/3709
+                epollInReady();
+            } else {
+                // Just to be safe make sure the input marked as closed.
+                shutdownInput(true);
+            }
+
+            // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
+            clearEpollRdHup();
+        }
+
+        /**
+         * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
+         */
+        private void clearEpollRdHup() {
+            try {
+                clearFlag(Native.EPOLLRDHUP);
+            } catch (IOException e) {
+                pipeline().fireExceptionCaught(e);
+                close(voidPromise());
+            }
+        }
+
+        /**
+         * Shutdown the input side of the channel.
+         */
+        void shutdownInput(boolean rdHup) {
+            if (!socket.isInputShutdown()) {
+                if (isAllowHalfClosure(config())) {
+                    try {
+                        socket.shutdown(true, false);
+                    } catch (IOException ignored) {
+                        // We attempted to shutdown and failed, which means the input has already effectively been
+                        // shutdown.
+                        fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
+                        return;
+                    } catch (NotYetConnectedException ignore) {
+                        // We attempted to shutdown and failed, which means the input has already effectively been
+                        // shutdown.
+                    }
+                    clearEpollIn();
+                    pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
+                } else {
+                    close(voidPromise());
+                }
+            } else if (!rdHup) {
+                inputClosedSeenErrorOnRead = true;
+                pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
+            }
+        }
+
+        private void fireEventAndClose(Object evt) {
+            pipeline().fireUserEventTriggered(evt);
+            close(voidPromise());
+        }
+
+        @Override
+        public RawSocketRecvByteAllocatorHandle recvBufAllocHandle() {
+            if (allocHandle == null) {
+                allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
+            }
+            return allocHandle;
+        }
+
+        /**
+         * Create a new {@link RawSocketRecvByteAllocatorHandle} instance.
+         * @param handle The handle to wrap with EPOLL specific logic.
+         */
+        RawSocketRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+            return new RawSocketRecvByteAllocatorHandle(handle);
+        }
+
+        @Override
+        protected void flush0() {
+            // Flush immediately only when there's no pending flush.
+            // If there's a pending flush operation, event loop will call forceFlush() later,
+            // and thus there's no need to call it now.
+            if (isFlagSet(Native.EPOLLOUT)) {
+                return;
+            }
+            super.flush0();
+        }
+
+        /**
+         * Called once a EPOLLOUT event is ready to be processed
+         */
+        final void epollOutReady() {
+            if (connectPromise != null) {
+                // pending connect which is now complete so handle it.
+                finishConnect();
+            } else if (!socket.isOutputShutdown()) {
+                // directly call super.flush0() to force a flush now
+                super.flush0();
+            }
+        }
+
+        protected final void clearEpollIn0() {
+            assert eventLoop().inEventLoop();
+            try {
+                readPending = false;
+                clearFlag(readFlag);
+            } catch (IOException e) {
+                // When this happens there is something completely wrong with either the filedescriptor or epoll,
+                // so fire the exception through the pipeline and close the Channel.
+                pipeline().fireExceptionCaught(e);
+                unsafe().close(unsafe().voidPromise());
+            }
+        }
+
+        @Override
+        public void connect(
+                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
+            if (!promise.setUncancellable() || !ensureOpen(promise)) {
+                return;
+            }
+
+            try {
+                if (connectPromise != null) {
+                    throw new ConnectionPendingException();
+                }
+
+                boolean wasActive = isActive();
+                if (doConnect(remoteAddress, localAddress)) {
+                    fulfillConnectPromise(promise, wasActive);
+                } else {
+                    connectPromise = promise;
+                    requestedRemoteAddress = remoteAddress;
+
+                    // Schedule connect timeout.
+                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
+                    if (connectTimeoutMillis > 0) {
+                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
+                            @Override
+                            public void run() {
+                                ChannelPromise connectPromise = AbstractRawSocketChannel.this.connectPromise;
+                                ConnectTimeoutException cause =
+                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
+                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
+                                    close(voidPromise());
+                                }
+                            }
+                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
+                    }
+
+                    promise.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            if (future.isCancelled()) {
+                                if (connectTimeoutFuture != null) {
+                                    connectTimeoutFuture.cancel(false);
+                                }
+                                connectPromise = null;
+                                close(voidPromise());
+                            }
+                        }
+                    });
+                }
+            } catch (Throwable t) {
+                closeIfClosed();
+                promise.tryFailure(annotateConnectException(t, remoteAddress));
+            }
+        }
+
+        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
+            if (promise == null) {
+                // Closed via cancellation and the promise has been notified already.
+                return;
+            }
+            active = true;
+
+            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
+            // We still need to ensure we call fireChannelActive() in this case.
+            boolean active = isActive();
+
+            // trySuccess() will return false if a user cancelled the connection attempt.
+            boolean promiseSet = promise.trySuccess();
+
+            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
+            // because what happened is what happened.
+            if (!wasActive && active) {
+                pipeline().fireChannelActive();
+            }
+
+            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
+            if (!promiseSet) {
+                close(voidPromise());
+            }
+        }
+
+        private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
+            if (promise == null) {
+                // Closed via cancellation and the promise has been notified already.
+                return;
+            }
+
+            // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+            promise.tryFailure(cause);
+            closeIfClosed();
+        }
+
+        private void finishConnect() {
+            // Note this method is invoked by the event loop only if the connection attempt was
+            // neither cancelled nor timed out.
+
+            assert eventLoop().inEventLoop();
+
+            boolean connectStillInProgress = false;
+            try {
+                boolean wasActive = isActive();
+                if (!doFinishConnect()) {
+                    connectStillInProgress = true;
+                    return;
+                }
+                fulfillConnectPromise(connectPromise, wasActive);
+            } catch (Throwable t) {
+                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
+            } finally {
+                if (!connectStillInProgress) {
+                    // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
+                    // See https://github.com/netty/netty/issues/1770
+                    if (connectTimeoutFuture != null) {
+                        connectTimeoutFuture.cancel(false);
+                    }
+                    connectPromise = null;
+                }
+            }
+        }
+
+        /**
+         * Finish the connect
+         */
+        private boolean doFinishConnect() throws Exception {
+            if (socket.finishConnect()) {
+                clearFlag(Native.EPOLLOUT);
+                if (requestedRemoteAddress instanceof InetSocketAddress) {
+                    remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
+                }
+                requestedRemoteAddress = null;
+
+                return true;
+            }
+            setFlag(Native.EPOLLOUT);
+            return false;
+        }
+    }
+
+    @Override
+    protected void doBind(SocketAddress local) throws Exception {
+        if (local instanceof InetSocketAddress) {
+            checkResolvable((InetSocketAddress) local);
+        }
+        socket.bind(local);
+        this.local = socket.localAddress();
+    }
+
+    /**
+     * Connect to the remote peer
+     */
+    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        if (localAddress instanceof InetSocketAddress) {
+            checkResolvable((InetSocketAddress) localAddress);
+        }
+
+        InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
+                ? (InetSocketAddress) remoteAddress : null;
+        if (remoteSocketAddr != null) {
+            checkResolvable(remoteSocketAddr);
+        }
+
+        if (remote != null) {
+            // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
+            // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
+            // later.
+            throw new AlreadyConnectedException();
+        }
+
+        if (localAddress != null) {
+            socket.bind(localAddress);
+        }
+
+        boolean connected = doConnect0(remoteAddress);
+        if (connected) {
+            remote = remoteSocketAddr == null ?
+                    remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
+        }
+        // We always need to set the localAddress even if not connected yet as the bind already took place.
+        //
+        // See https://github.com/netty/netty/issues/3463
+        local = socket.localAddress();
+        return connected;
+    }
+
+    private boolean doConnect0(SocketAddress remote) throws Exception {
+        boolean success = false;
+        try {
+            boolean connected = socket.connect(remote);
+            if (!connected) {
+                setFlag(Native.EPOLLOUT);
+            }
+            success = true;
+            return connected;
+        } finally {
+            if (!success) {
+                doClose();
+            }
+        }
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return local;
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return remote;
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
new file mode 100644
index 0000000..a1f2ff4
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
@@ -0,0 +1,1042 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+import io.netty.channel.internal.ChannelUtils;
+import io.netty.channel.socket.DuplexChannel;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.IovArray;
+import io.netty.channel.unix.SocketWritableByteChannel;
+import io.netty.channel.unix.UnixChannelUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.StringUtil;
+import io.netty.util.internal.ThrowableUtil;
+import io.netty.util.internal.UnstableApi;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.FileDescriptor.pipe;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+public abstract class AbstractRawSocketStreamChannel extends AbstractRawSocketChannel implements DuplexChannel {
+    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
+    private static final String EXPECTED_TYPES =
+            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
+                    StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRawSocketStreamChannel.class);
+    private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
+            ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+                    AbstractRawSocketStreamChannel.class, "clearSpliceQueue()");
+    private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new ClosedChannelException(),
+            AbstractRawSocketStreamChannel.class, "spliceTo(...)");
+    private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
+            ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+            AbstractRawSocketStreamChannel.class, "failSpliceIfClosed(...)");
+    private final Runnable flushTask = new Runnable() {
+        @Override
+        public void run() {
+            flush();
+        }
+    };
+    private Queue<SpliceInTask> spliceQueue;
+
+    // Lazy init these if we need to splice(...)
+    private FileDescriptor pipeIn;
+    private FileDescriptor pipeOut;
+
+    private WritableByteChannel byteChannel;
+
+    protected AbstractRawSocketStreamChannel(Channel parent, int fd) {
+        this(parent, new LinuxSocket(fd));
+    }
+
+    protected AbstractRawSocketStreamChannel(int fd) {
+        this(new LinuxSocket(fd));
+    }
+
+    AbstractRawSocketStreamChannel(LinuxSocket fd) {
+        this(fd, isSoErrorZero(fd));
+    }
+
+    AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd) {
+        super(parent, fd, Native.EPOLLIN, true);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
+        super(parent, fd, Native.EPOLLIN, remote);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    protected AbstractRawSocketStreamChannel(LinuxSocket fd, boolean active) {
+        super(null, fd, Native.EPOLLIN, active);
+        // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+        flags |= Native.EPOLLRDHUP;
+    }
+
+    @Override
+    protected AbstractEpollUnsafe newUnsafe() {
+        return new EpollStreamUnsafe();
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return METADATA;
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+     * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+     * splice until the {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+     *   {@link IllegalArgumentException} is thrown. </li>
+     *   <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+     *   target {@link AbstractRawSocketStreamChannel}</li>
+     * </ul>
+     *
+     */
+    public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len) {
+        return spliceTo(ch, len, newPromise());
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+     * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+     * splice until the {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+     *   {@link IllegalArgumentException} is thrown. </li>
+     *   <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+     *   target {@link AbstractRawSocketStreamChannel}</li>
+     * </ul>
+     *
+     */
+    public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len,
+                                        final ChannelPromise promise) {
+        if (ch.eventLoop() != eventLoop()) {
+            throw new IllegalArgumentException("EventLoops are not the same.");
+        }
+        if (len < 0) {
+            throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+        }
+        if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
+                || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
+            throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
+        }
+        checkNotNull(promise, "promise");
+        if (!isOpen()) {
+            promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+        } else {
+            addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
+            failSpliceIfClosed(promise);
+        }
+        return promise;
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+     * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+     * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+     * {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+     *   {@link AbstractRawSocketStreamChannel}</li>
+     *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
+     *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+     * </ul>
+     */
+    public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
+        return spliceTo(ch, offset, len, newPromise());
+    }
+
+    /**
+     * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+     * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+     * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+     * {@link ChannelFuture} was canceled or it was failed.
+     *
+     * Please note:
+     * <ul>
+     *   <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+     *   {@link AbstractRawSocketStreamChannel}</li>
+     *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
+     *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+     * </ul>
+     */
+    public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
+                                        final ChannelPromise promise) {
+        if (len < 0) {
+            throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+        }
+        if (offset < 0) {
+            throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
+        }
+        if (config().getRawSocketMode() != RawSocketMode.LEVEL_TRIGGERED) {
+            throw new IllegalStateException("spliceTo() supported only when using " + RawSocketMode.LEVEL_TRIGGERED);
+        }
+        checkNotNull(promise, "promise");
+        if (!isOpen()) {
+            promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+        } else {
+            addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
+            failSpliceIfClosed(promise);
+        }
+        return promise;
+    }
+
+    private void failSpliceIfClosed(ChannelPromise promise) {
+        if (!isOpen()) {
+            // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
+            // cases where a future may not be notified otherwise.
+            if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
+                eventLoop().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Call this via the EventLoop as it is a MPSC queue.
+                        clearSpliceQueue();
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
+     * @param in the collection which contains objects to write.
+     * @param buf the {@link ByteBuf} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+        int readableBytes = buf.readableBytes();
+        if (readableBytes == 0) {
+            in.remove();
+            return 0;
+        }
+
+        if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
+            return doWriteBytes(in, buf);
+        } else {
+            ByteBuffer[] nioBuffers = buf.nioBuffers();
+            return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
+                    config().getMaxBytesPerGatheringWrite());
+        }
+    }
+
+    private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
+        // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
+        // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
+        // make a best effort to adjust as OS behavior changes.
+        if (attempted == written) {
+            if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
+                config().setMaxBytesPerGatheringWrite(attempted << 1);
+            }
+        } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
+            config().setMaxBytesPerGatheringWrite(attempted >>> 1);
+        }
+    }
+
+    /**
+     * Write multiple bytes via {@link IovArray}.
+     * @param in the collection which contains objects to write.
+     * @param array The array which contains the content to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws IOException If an I/O exception occurs during write.
+     */
+    private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
+        final long expectedWrittenBytes = array.size();
+        assert expectedWrittenBytes != 0;
+        final int cnt = array.count();
+        assert cnt != 0;
+
+        final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
+        if (localWrittenBytes > 0) {
+            adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
+            in.removeBytes(localWrittenBytes);
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write multiple bytes via {@link ByteBuffer} array.
+     * @param in the collection which contains objects to write.
+     * @param nioBuffers The buffers to write.
+     * @param nioBufferCnt The number of buffers to write.
+     * @param expectedWrittenBytes The number of bytes we expect to write.
+     * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws IOException If an I/O exception occurs during write.
+     */
+    private int writeBytesMultiple(
+        ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
+        long maxBytesPerGatheringWrite) throws IOException {
+        assert expectedWrittenBytes != 0;
+        if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
+            expectedWrittenBytes = maxBytesPerGatheringWrite;
+        }
+
+        final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
+        if (localWrittenBytes > 0) {
+            adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
+            in.removeBytes(localWrittenBytes);
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write a {@link DefaultFileRegion}
+     * @param in the collection which contains objects to write.
+     * @param region the {@link DefaultFileRegion} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
+        final long regionCount = region.count();
+        if (region.transferred() >= regionCount) {
+            in.remove();
+            return 0;
+        }
+
+        final long offset = region.transferred();
+        final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
+        if (flushedAmount > 0) {
+            in.progress(flushedAmount);
+            if (region.transferred() >= regionCount) {
+                in.remove();
+            }
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    /**
+     * Write a {@link FileRegion}
+     * @param in the collection which contains objects to write.
+     * @param region the {@link FileRegion} from which the bytes should be written
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     */
+    private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
+        if (region.transferred() >= region.count()) {
+            in.remove();
+            return 0;
+        }
+
+        if (byteChannel == null) {
+            byteChannel = new RawSocketSocketWritableByteChannel();
+        }
+        final long flushedAmount = region.transferTo(byteChannel, region.transferred());
+        if (flushedAmount > 0) {
+            in.progress(flushedAmount);
+            if (region.transferred() >= region.count()) {
+                in.remove();
+            }
+            return 1;
+        }
+        return WRITE_STATUS_SNDBUF_FULL;
+    }
+
+    @Override
+    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+        int writeSpinCount = config().getWriteSpinCount();
+        do {
+            final int msgCount = in.size();
+            // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
+            if (msgCount > 1 && in.current() instanceof ByteBuf) {
+                writeSpinCount -= doWriteMultiple(in);
+            } else if (msgCount == 0) {
+                // Wrote all messages.
+                clearFlag(Native.EPOLLOUT);
+                // Return here so we not set the EPOLLOUT flag.
+                return;
+            } else {  // msgCount == 1
+                writeSpinCount -= doWriteSingle(in);
+            }
+
+            // We do not break the loop here even if the outbound buffer was flushed completely,
+            // because a user might have triggered another write and flush when we notify his or her
+            // listeners.
+        } while (writeSpinCount > 0);
+
+        if (writeSpinCount == 0) {
+            // We used our writeSpin quantum, and should try to write again later.
+            eventLoop().execute(flushTask);
+        } else {
+            // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
+            // when it can accept more data.
+            setFlag(Native.EPOLLOUT);
+        }
+    }
+
+    /**
+     * Attempt to write a single object.
+     * @param in the collection which contains objects to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws Exception If an I/O error occurs.
+     */
+    protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
+        // The outbound buffer contains only one message or it contains a file region.
+        Object msg = in.current();
+        if (msg instanceof ByteBuf) {
+            return writeBytes(in, (ByteBuf) msg);
+        } else if (msg instanceof DefaultFileRegion) {
+            return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
+        } else if (msg instanceof FileRegion) {
+            return writeFileRegion(in, (FileRegion) msg);
+        } else if (msg instanceof SpliceOutTask) {
+            if (!((SpliceOutTask) msg).spliceOut()) {
+                return WRITE_STATUS_SNDBUF_FULL;
+            }
+            in.remove();
+            return 1;
+        } else {
+            // Should never reach here.
+            throw new Error();
+        }
+    }
+
+    /**
+     * Attempt to write multiple {@link ByteBuf} objects.
+     * @param in the collection which contains objects to write.
+     * @return The value that should be decremented from the write quantum which starts at
+     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+     * <ul>
+     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+     *     is encountered</li>
+     *     <li>1 - if a single call to write data was made to the OS</li>
+     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+     *     no data was accepted</li>
+     * </ul>
+     * @throws Exception If an I/O error occurs.
+     */
+    private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
+        final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
+        if (PlatformDependent.hasUnsafe()) {
+            IovArray array = ((RawSocketEventLoop) eventLoop()).cleanArray();
+            array.maxBytes(maxBytesPerGatheringWrite);
+            in.forEachFlushedMessage(array);
+
+            if (array.count() >= 1) {
+                // TODO: Handle the case where cnt == 1 specially.
+                return writeBytesMultiple(in, array);
+            }
+        } else {
+            ByteBuffer[] buffers = in.nioBuffers();
+            int cnt = in.nioBufferCount();
+            if (cnt >= 1) {
+                // TODO: Handle the case where cnt == 1 specially.
+                return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
+            }
+        }
+        // cnt == 0, which means the outbound buffer contained empty buffers only.
+        in.removeBytes(0);
+        return 0;
+    }
+
+    @Override
+    protected Object filterOutboundMessage(Object msg) {
+        if (msg instanceof ByteBuf) {
+            ByteBuf buf = (ByteBuf) msg;
+            return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
+        }
+
+        if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
+            return msg;
+        }
+
+        throw new UnsupportedOperationException(
+                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
+    }
+
+    @UnstableApi
+    @Override
+    protected final void doShutdownOutput() throws Exception {
+        socket.shutdown(false, true);
+    }
+
+    private void shutdownInput0(final ChannelPromise promise) {
+        try {
+            socket.shutdown(true, false);
+            promise.setSuccess();
+        } catch (Throwable cause) {
+            promise.setFailure(cause);
+        }
+    }
+
+    @Override
+    public boolean isOutputShutdown() {
+        return socket.isOutputShutdown();
+    }
+
+    @Override
+    public boolean isInputShutdown() {
+        return socket.isInputShutdown();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return socket.isShutdown();
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput() {
+        return shutdownOutput(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput(final ChannelPromise promise) {
+        EventLoop loop = eventLoop();
+        if (loop.inEventLoop()) {
+            ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+        } else {
+            loop.execute(new Runnable() {
+                @Override
+                public void run() {
+                    ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+                }
+            });
+        }
+
+        return promise;
+    }
+
+    @Override
+    public ChannelFuture shutdownInput() {
+        return shutdownInput(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdownInput(final ChannelPromise promise) {
+        Executor closeExecutor = ((RawSocketStreamUnsafe) unsafe()).prepareToClose();
+        if (closeExecutor != null) {
+            closeExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    shutdownInput0(promise);
+                }
+            });
+        } else {
+            EventLoop loop = eventLoop();
+            if (loop.inEventLoop()) {
+                shutdownInput0(promise);
+            } else {
+                loop.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        shutdownInput0(promise);
+                    }
+                });
+            }
+        }
+        return promise;
+    }
+
+    @Override
+    public ChannelFuture shutdown() {
+        return shutdown(newPromise());
+    }
+
+    @Override
+    public ChannelFuture shutdown(final ChannelPromise promise) {
+        ChannelFuture shutdownOutputFuture = shutdownOutput();
+        if (shutdownOutputFuture.isDone()) {
+            shutdownOutputDone(shutdownOutputFuture, promise);
+        } else {
+            shutdownOutputFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
+                    shutdownOutputDone(shutdownOutputFuture, promise);
+                }
+            });
+        }
+        return promise;
+    }
+
+    private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
+        ChannelFuture shutdownInputFuture = shutdownInput();
+        if (shutdownInputFuture.isDone()) {
+            shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+        } else {
+            shutdownInputFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
+                    shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+                }
+            });
+        }
+    }
+
+    private static void shutdownDone(ChannelFuture shutdownOutputFuture,
+                              ChannelFuture shutdownInputFuture,
+                              ChannelPromise promise) {
+        Throwable shutdownOutputCause = shutdownOutputFuture.cause();
+        Throwable shutdownInputCause = shutdownInputFuture.cause();
+        if (shutdownOutputCause != null) {
+            if (shutdownInputCause != null) {
+                logger.debug("Exception suppressed because a previous exception occurred.",
+                        shutdownInputCause);
+            }
+            promise.setFailure(shutdownOutputCause);
+        } else if (shutdownInputCause != null) {
+            promise.setFailure(shutdownInputCause);
+        } else {
+            promise.setSuccess();
+        }
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        try {
+            // Calling super.doClose() first so spliceTo(...) will fail on next call.
+            super.doClose();
+        } finally {
+            safeClosePipe(pipeIn);
+            safeClosePipe(pipeOut);
+            clearSpliceQueue();
+        }
+    }
+
+    private void clearSpliceQueue() {
+        if (spliceQueue == null) {
+            return;
+        }
+        for (;;) {
+            SpliceInTask task = spliceQueue.poll();
+            if (task == null) {
+                break;
+            }
+            task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
+        }
+    }
+
+    private static void safeClosePipe(FileDescriptor fd) {
+        if (fd != null) {
+            try {
+                fd.close();
+            } catch (IOException e) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Error while closing a pipe", e);
+                }
+            }
+        }
+    }
+
+    class RawSocketStreamUnsafe extends AbstractRawSocketUnsafe {
+        // Overridden here just to be able to access this method from AbstractRawSocketStreamChannel
+        @Override
+        protected Executor prepareToClose() {
+            return super.prepareToClose();
+        }
+
+        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
+                                         RawSocketRecvByteAllocatorHandle allocHandle) {
+            if (byteBuf != null) {
+                if (byteBuf.isReadable()) {
+                    readPending = false;
+                    pipeline.fireChannelRead(byteBuf);
+                } else {
+                    byteBuf.release();
+                }
+            }
+            allocHandle.readComplete();
+            pipeline.fireChannelReadComplete();
+            pipeline.fireExceptionCaught(cause);
+            if (close || cause instanceof IOException) {
+                shutdownInput(false);
+            }
+        }
+
+        @Override
+        RawSocketRecvByteAllocatorHandle newRawSocketHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+            return new RawSocketRecvByteAllocatorStreamingHandle(handle);
+        }
+
+        @Override
+        void RawSocketInReady() {
+            final ChannelConfig config = config();
+            if (shouldBreakRawSocketInReady(config)) {
+                clearRawSocketIn0();
+                return;
+            }
+            final RawSocketRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+            allocHandle.edgeTriggered(isFlagSet(Native.RawSocketET));
+
+            final ChannelPipeline pipeline = pipeline();
+            final ByteBufAllocator allocator = config.getAllocator();
+            allocHandle.reset(config);
+            RawSocketInBefore();
+
+            ByteBuf byteBuf = null;
+            boolean close = false;
+            try {
+                do {
+                    if (spliceQueue != null) {
+                        SpliceInTask spliceTask = spliceQueue.peek();
+                        if (spliceTask != null) {
+                            if (spliceTask.spliceIn(allocHandle)) {
+                                // We need to check if it is still active as if not we removed all SpliceTasks in
+                                // doClose(...)
+                                if (isActive()) {
+                                    spliceQueue.remove();
+                                }
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+
+                    // we use a direct buffer here as the native implementations only be able
+                    // to handle direct buffers.
+                    byteBuf = allocHandle.allocate(allocator);
+                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
+                    if (allocHandle.lastBytesRead() <= 0) {
+                        // nothing was read, release the buffer.
+                        byteBuf.release();
+                        byteBuf = null;
+                        close = allocHandle.lastBytesRead() < 0;
+                        if (close) {
+                            // There is nothing left to read as we received an EOF.
+                            readPending = false;
+                        }
+                        break;
+                    }
+                    allocHandle.incMessagesRead(1);
+                    readPending = false;
+                    pipeline.fireChannelRead(byteBuf);
+                    byteBuf = null;
+
+                    if (shouldBreakRawSocketInReady(config)) {
+                        // We need to do this for two reasons:
+                        //
+                        // - If the input was shutdown in between (which may be the case when the user did it in the
+                        //   fireChannelRead(...) method we should not try to read again to not produce any
+                        //   miss-leading exceptions.
+                        //
+                        // - If the user closes the channel we need to ensure we not try to read from it again as
+                        //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
+                        //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
+                        //   reading data from a filedescriptor that belongs to another socket then the socket that
+                        //   was "wrapped" by this Channel implementation.
+                        break;
+                    }
+                } while (allocHandle.continueReading());
+
+                allocHandle.readComplete();
+                pipeline.fireChannelReadComplete();
+
+                if (close) {
+                    shutdownInput(false);
+                }
+            } catch (Throwable t) {
+                handleReadException(pipeline, byteBuf, t, close, allocHandle);
+            } finally {
+                RawSocketInFinally(config);
+            }
+        }
+    }
+
+    private void addToSpliceQueue(final SpliceInTask task) {
+        EventLoop eventLoop = eventLoop();
+        if (eventLoop.inEventLoop()) {
+            addToSpliceQueue0(task);
+        } else {
+            eventLoop.execute(new Runnable() {
+                @Override
+                public void run() {
+                    addToSpliceQueue0(task);
+                }
+            });
+        }
+    }
+
+    private void addToSpliceQueue0(SpliceInTask task) {
+        if (spliceQueue == null) {
+            spliceQueue = PlatformDependent.newMpscQueue();
+        }
+        spliceQueue.add(task);
+    }
+
+    protected abstract class SpliceInTask {
+        final ChannelPromise promise;
+        int len;
+
+        protected SpliceInTask(int len, ChannelPromise promise) {
+            this.promise = promise;
+            this.len = len;
+        }
+
+        abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
+
+        protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
+            // calculate the maximum amount of data we are allowed to splice
+            int length = Math.min(handle.guess(), len);
+            int splicedIn = 0;
+            for (;;) {
+                // Splicing until there is nothing left to splice.
+                int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
+                if (localSplicedIn == 0) {
+                    break;
+                }
+                splicedIn += localSplicedIn;
+                length -= localSplicedIn;
+            }
+
+            return splicedIn;
+        }
+    }
+
+    // Let it directly implement channelFutureListener as well to reduce object creation.
+    private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
+        private final AbstractRawSocketStreamChannel ch;
+
+        SpliceInChannelTask(AbstractRawSocketStreamChannel ch, int len, ChannelPromise promise) {
+            super(len, promise);
+            this.ch = ch;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                promise.setFailure(future.cause());
+            }
+        }
+
+        @Override
+        public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+            assert ch.eventLoop().inEventLoop();
+            if (len == 0) {
+                promise.setSuccess();
+                return true;
+            }
+            try {
+                // We create the pipe on the target channel as this will allow us to just handle pending writes
+                // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
+                // on multiple Channels pointing to one target Channel.
+                FileDescriptor pipeOut = ch.pipeOut;
+                if (pipeOut == null) {
+                    // Create a new pipe as non was created before.
+                    FileDescriptor[] pipe = pipe();
+                    ch.pipeIn = pipe[0];
+                    pipeOut = ch.pipeOut = pipe[1];
+                }
+
+                int splicedIn = spliceIn(pipeOut, handle);
+                if (splicedIn > 0) {
+                    // Integer.MAX_VALUE is a special value which will result in splice forever.
+                    if (len != Integer.MAX_VALUE) {
+                        len -= splicedIn;
+                    }
+
+                    // Depending on if we are done with splicing inbound data we set the right promise for the
+                    // outbound splicing.
+                    final ChannelPromise splicePromise;
+                    if (len == 0) {
+                        splicePromise = promise;
+                    } else {
+                        splicePromise = ch.newPromise().addListener(this);
+                    }
+
+                    boolean autoRead = config().isAutoRead();
+
+                    // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
+                    // case.
+                    ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
+                    ch.unsafe().flush();
+                    if (autoRead && !splicePromise.isDone()) {
+                        // Write was not done which means the target channel was not writable. In this case we need to
+                        // disable reading until we are done with splicing to the target channel because:
+                        //
+                        // - The user may want to to trigger another splice operation once the splicing was complete.
+                        config().setAutoRead(false);
+                    }
+                }
+
+                return len == 0;
+            } catch (Throwable cause) {
+                promise.setFailure(cause);
+                return true;
+            }
+        }
+    }
+
+    private final class SpliceOutTask {
+        private final AbstractRawSocketStreamChannel ch;
+        private final boolean autoRead;
+        private int len;
+
+        SpliceOutTask(AbstractRawSocketStreamChannel ch, int len, boolean autoRead) {
+            this.ch = ch;
+            this.len = len;
+            this.autoRead = autoRead;
+        }
+
+        public boolean spliceOut() throws Exception {
+            assert ch.eventLoop().inEventLoop();
+            try {
+                int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
+                len -= splicedOut;
+                if (len == 0) {
+                    if (autoRead) {
+                        // AutoRead was used and we spliced everything so start reading again
+                        config().setAutoRead(true);
+                    }
+                    return true;
+                }
+                return false;
+            } catch (IOException e) {
+                if (autoRead) {
+                    // AutoRead was used and we spliced everything so start reading again
+                    config().setAutoRead(true);
+                }
+                throw e;
+            }
+        }
+    }
+
+    private final class SpliceFdTask extends SpliceInTask {
+        private final FileDescriptor fd;
+        private final ChannelPromise promise;
+        private final int offset;
+
+        SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
+            super(len, promise);
+            this.fd = fd;
+            this.promise = promise;
+            this.offset = offset;
+        }
+
+        @Override
+        public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+            assert eventLoop().inEventLoop();
+            if (len == 0) {
+                promise.setSuccess();
+                return true;
+            }
+
+            try {
+                FileDescriptor[] pipe = pipe();
+                FileDescriptor pipeIn = pipe[0];
+                FileDescriptor pipeOut = pipe[1];
+                try {
+                    int splicedIn = spliceIn(pipeOut, handle);
+                    if (splicedIn > 0) {
+                        // Integer.MAX_VALUE is a special value which will result in splice forever.
+                        if (len != Integer.MAX_VALUE) {
+                            len -= splicedIn;
+                        }
+                        do {
+                            int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
+                            splicedIn -= splicedOut;
+                        } while (splicedIn > 0);
+                        if (len == 0) {
+                            promise.setSuccess();
+                            return true;
+                        }
+                    }
+                    return false;
+                } finally {
+                    safeClosePipe(pipeIn);
+                    safeClosePipe(pipeOut);
+                }
+            } catch (Throwable cause) {
+                promise.setFailure(cause);
+                return true;
+            }
+        }
+    }
+
+    private final class RawSocketSocketWritableByteChannel extends SocketWritableByteChannel {
+        RawSocketSocketWritableByteChannel() {
+            super(socket);
+        }
+
+        @Override
+        protected ByteBufAllocator alloc() {
+            return AbstractRawSocketStreamChannel.this.alloc();
+        }
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
index 8ac21dd..6f4b76a 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
@@ -1,186 +1,118 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
 package org.apache.plc4x.java.utils.rawsockets.netty;
 
-import com.savarese.rocksaw.net.RawSocket;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.nio.AbstractNioByteChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.internal.logging.InternalLogger;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectableChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
 
-/**
- * Netty channel implementation that uses RockSaw to create a raw socket connection to implement
- * IP-socket based protocols not based on TCP or UDP.
- *
- * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
- */
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannel extends AbstractRawSocketStreamChannel implements SocketChannel {
 
-    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
+    private final RawSocketChannelConfig config;
 
-    // The protocol number is defined in the IP protocol and indicates the type of protocol the payload
-    // the IP packet uses. This number is assigned by the IESG. A full list of the registered protocol
-    // numbers can be found here: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
-    private int protocolNumber;
+    private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
 
-    private RawSocket socket;
-    private InetSocketAddress localAddress;
-    private InetSocketAddress remoteAddress;
-
-    /**
-     * Initializes a raw socket that is able to communicate with raw IPv4 and IPv6 sockets, hereby
-     * allowing to implement protocols below TCP and UDP.
-     *
-     * For a list of public known protocol numbers see:
-     * https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
-     *
-     * @param parent
-     * @param ch
-     * @param protocolNumber protocol number identifying the protocol.
-     * @throws PlcIoException
-     */
-    public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
-        super(parent, ch);
-
-        this.protocolNumber = protocolNumber;
-
-        try {
-            socket = new RawSocket();
-            socket.setIPHeaderInclude(true);
-        } catch (IOException e) {
-            throw new PlcIoException("Error setting up raw socket", e);
-        }
+    public RawSocketChannel() {
+        super(newSocketStream(), false);
+        config = new RawSocketChannelConfig(this);
     }
 
-    /**
-     * Opens a connection to the given remote address.
-     *
-     * @param remoteAddress
-     * @param localAddress
-     * @return
-     * @throws Exception
-     */
-    @Override
-    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
-        if(!(remoteAddress instanceof InetSocketAddress) || !(localAddress instanceof InetSocketAddress)) {
-            throw new PlcIoException("Both remoteAddress and localAddress must be of type InetSocketAddress");
-        }
-
-        try {
-            this.localAddress = (InetSocketAddress) localAddress;
-            this.remoteAddress = (InetSocketAddress) remoteAddress;
-
-            socket.open(RawSocket.PF_INET, protocolNumber);
+    public RawSocketChannel(int fd) {
+        super(fd);
+        config = new RawSocketChannelConfig(this);
+    }
 
-            return socket.isOpen();
-        } catch (IllegalStateException | IOException e) {
-            return false;
-        }
+    RawSocketChannel(LinuxSocket fd, boolean active) {
+        super(fd, active);
+        config = new RawSocketChannelConfig(this);
     }
 
-    @Override
-    protected void doFinishConnect() throws Exception {
+    RawSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
+        super(parent, fd, remoteAddress);
+        config = new RawSocketChannelConfig(this);
 
+        if (parent instanceof RawSocketChannel) {
+            tcpMd5SigAddresses = ((RawSocketChannel) parent).tcpMd5SigAddresses();
+        }
     }
 
     /**
-     * Opens a listening socket.
-     *
-     * @param localAddress
-     * @throws Exception
+     * Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
      */
-    @Override
-    protected void doBind(SocketAddress localAddress) throws Exception {
-        if(socket.isOpen()) {
-            throw new PlcIoException("Raw socket already opened.");
-        }
-        if(localAddress instanceof InetSocketAddress) {
-            this.localAddress = (InetSocketAddress) localAddress;
-            socket.bind(this.localAddress.getAddress());
-        } else {
-            throw new PlcIoException("Unsupported type of local address. Only InetSocketAddress supported.");
-        }
+    public RawSocketTcpInfo tcpInfo() {
+        return tcpInfo(new RawSocketTcpInfo());
     }
 
     /**
-     * Closes the connection.
-     *
-     * @throws Exception
+     * Updates and returns the {@code TCP_INFO} for the current socket.
+     * See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
      */
-    @Override
-    protected void doDisconnect() throws Exception {
-        if(socket.isOpen()) {
-            socket.close();
+    public RawSocketTcpInfo tcpInfo(RawSocketTcpInfo info) {
+        try {
+            socket.getTcpInfo(info);
+            return info;
+        } catch (IOException e) {
+            throw new ChannelException(e);
         }
     }
 
     @Override
-    protected ChannelFuture shutdownInput() {
-        return null;
-    }
-
-    @Override
-    protected int doReadBytes(ByteBuf buf) throws Exception {
-        byte[] byteBuf = new byte[1024];
-        int readBytes = socket.read(byteBuf);
-        buf.writeBytes(byteBuf, 0, readBytes);
-        return readBytes;
+    public InetSocketAddress remoteAddress() {
+        return (InetSocketAddress) super.remoteAddress();
     }
 
     @Override
-    protected int doWriteBytes(ByteBuf buf) throws Exception {
-        byte[] readableBytes = new byte[buf.readableBytes()];
-        buf.readBytes(readableBytes);
-        socket.write(remoteAddress.getAddress(), readableBytes);
-        return readableBytes.length;
+    public InetSocketAddress localAddress() {
+        return (InetSocketAddress) super.localAddress();
     }
 
     @Override
-    protected long doWriteFileRegion(FileRegion region) throws Exception {
-        throw new UnsupportedOperationException("doWriteFileRegion not implemented");
+    public RawSocketChannelConfig config() {
+        return config;
     }
 
     @Override
-    protected SocketAddress localAddress0() {
-        return localAddress;
+    public ServerSocketChannel parent() {
+        return (ServerSocketChannel) super.parent();
     }
 
     @Override
-    protected SocketAddress remoteAddress0() {
-        return remoteAddress;
+    protected AbstractEpollUnsafe newUnsafe() {
+        return new EpollSocketChannelUnsafe();
     }
 
-    @Override
-    public ChannelConfig config() {
-        return null;
+    private final class EpollSocketChannelUnsafe extends RawSocketStreamUnsafe {
+        @Override
+        protected Executor prepareToClose() {
+            try {
+                // Check isOpen() first as otherwise it will throw a RuntimeException
+                // when call getSoLinger() as the fd is not valid anymore.
+                if (isOpen() && config().getSoLinger() > 0) {
+                    // We need to cancel this key of the channel so we may not end up in a eventloop spin
+                    // because we try to read or write until the actual close happens which may be later due
+                    // SO_LINGER handling.
+                    // See https://github.com/netty/netty/issues/4449
+                    ((RawSocketEventLoop) eventLoop()).remove(RawSocketChannel.this);
+                    return GlobalEventExecutor.INSTANCE;
+                }
+            } catch (Throwable ignore) {
+                // Ignore the error as the underlying channel may be closed in the meantime and so
+                // getSoLinger() may produce an exception. In this case we just return null.
+                // See https://github.com/netty/netty/issues/4449
+            }
+            return null;
+        }
     }
 
-    @Override
-    public boolean isActive() {
-        return socket.isOpen();
+    void setTcpMd5Sig(Map<InetAddress, byte[]> keys) throws IOException {
+        tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
     }
 }
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
new file mode 100644
index 0000000..8f77972
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static io.netty.channel.unix.Limits.SSIZE_MAX;
+
+public class RawSocketChannelConfig extends DefaultChannelConfig {
+    final AbstractRawSocketChannel channel;
+    private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
+
+    RawSocketChannelConfig(AbstractRawSocketChannel channel) {
+        super(channel);
+        this.channel = channel;
+    }
+
+    @Override
+    public Map<ChannelOption<?>, Object> getOptions() {
+        return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getOption(ChannelOption<T> option) {
+        if (option == EpollChannelOption.EPOLL_MODE) {
+            return (T) getEpollMode();
+        }
+        return super.getOption(option);
+    }
+
+    @Override
+    public <T> boolean setOption(ChannelOption<T> option, T value) {
+        validate(option, value);
+        if (option == EpollChannelOption.EPOLL_MODE) {
+            setEpollMode((EpollMode) value);
+        } else {
+            return super.setOption(option, value);
+        }
+        return true;
+    }
+
+    @Override
+    public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
+        super.setConnectTimeoutMillis(connectTimeoutMillis);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
+        super.setMaxMessagesPerRead(maxMessagesPerRead);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
+        super.setWriteSpinCount(writeSpinCount);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
+        super.setAllocator(allocator);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
+        if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
+            throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
+                    RecvByteBufAllocator.ExtendedHandle.class);
+        }
+        super.setRecvByteBufAllocator(allocator);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setAutoRead(boolean autoRead) {
+        super.setAutoRead(autoRead);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
+        return this;
+    }
+
+    @Override
+    @Deprecated
+    public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
+        super.setWriteBufferWaterMark(writeBufferWaterMark);
+        return this;
+    }
+
+    @Override
+    public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
+        super.setMessageSizeEstimator(estimator);
+        return this;
+    }
+
+    /**
+     * Return the {@link EpollMode} used. Default is
+     * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+     * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+     * {@link EpollMode#LEVEL_TRIGGERED}.
+     */
+    public EpollMode getEpollMode() {
+        return channel.isFlagSet(Native.EPOLLET)
+                ? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
+    }
+
+    /**
+     * Set the {@link EpollMode} used. Default is
+     * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+     * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+     * {@link EpollMode#LEVEL_TRIGGERED}.
+     *
+     * <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
+     */
+    public EpollChannelConfig setEpollMode(EpollMode mode) {
+        if (mode == null) {
+            throw new NullPointerException("mode");
+        }
+        try {
+            switch (mode) {
+            case EDGE_TRIGGERED:
+                checkChannelNotRegistered();
+                channel.setFlag(Native.EPOLLET);
+                break;
+            case LEVEL_TRIGGERED:
+                checkChannelNotRegistered();
+                channel.clearFlag(Native.EPOLLET);
+                break;
+            default:
+                throw new Error();
+            }
+        } catch (IOException e) {
+            throw new ChannelException(e);
+        }
+        return this;
+    }
+
+    private void checkChannelNotRegistered() {
+        if (channel.isRegistered()) {
+            throw new IllegalStateException("EpollMode can only be changed before channel is registered");
+        }
+    }
+
+    @Override
+    protected final void autoReadCleared() {
+        channel.clearEpollIn();
+    }
+
+    final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
+        this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
+    }
+
+    final long getMaxBytesPerGatheringWrite() {
+        return maxBytesPerGatheringWrite;
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
similarity index 97%
copy from plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
copy to plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
index 8ac21dd..cba1ad8 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
@@ -38,7 +38,7 @@ import java.nio.channels.SelectableChannel;
  *
  * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
  */
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannelSav extends AbstractNioByteChannel {
 
     private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
 
@@ -63,7 +63,7 @@ public class RawSocketChannel extends AbstractNioByteChannel {
      * @param protocolNumber protocol number identifying the protocol.
      * @throws PlcIoException
      */
-    public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
+    public RawSocketChannelSav(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
         super(parent, ch);
 
         this.protocolNumber = protocolNumber;
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
new file mode 100644
index 0000000..03e024d
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead.
+ *
+ * typedef union epoll_data {
+ *     void        *ptr;
+ *     int          fd;
+ *     uint32_t     u32;
+ *     uint64_t     u64;
+ * } epoll_data_t;
+ *
+ * struct epoll_event {
+ *     uint32_t     events;      // Epoll events
+ *     epoll_data_t data;        // User data variable
+ * };
+ *
+ * We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an
+ * {@link AbstractRawSocketChannel} and so be able to map it later.
+ */
+final class RawSocketEventArray {
+    // Size of the epoll_event struct
+    private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent();
+    // The offsiet of the data union in the epoll_event struct
+    private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData();
+
+    private long memoryAddress;
+    private int length;
+
+    RawSocketEventArray(int length) {
+        if (length < 1) {
+            throw new IllegalArgumentException("length must be >= 1 but was " + length);
+        }
+        this.length = length;
+        memoryAddress = allocate(length);
+    }
+
+    private static long allocate(int length) {
+        return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE);
+    }
+
+    /**
+     * Return the {@code memoryAddress} which points to the start of this {@link RawSocketEventArray}.
+     */
+    long memoryAddress() {
+        return memoryAddress;
+    }
+
+    /**
+     * Return the length of the {@link RawSocketEventArray} which represent the maximum number of {@code epoll_events}
+     * that can be stored in it.
+     */
+    int length() {
+        return length;
+    }
+
+    /**
+     * Increase the storage of this {@link RawSocketEventArray}.
+     */
+    void increase() {
+        // double the size
+        length <<= 1;
+        free();
+        memoryAddress = allocate(length);
+    }
+
+    /**
+     * Free this {@link RawSocketEventArray}. Any usage after calling this method may segfault the JVM!
+     */
+    void free() {
+        PlatformDependent.freeMemory(memoryAddress);
+    }
+
+    /**
+     * Return the events for the {@code epoll_event} on this index.
+     */
+    int events(int index) {
+        return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE);
+    }
+
+    /**
+     * Return the file descriptor for the {@code epoll_event} on this index.
+     */
+    int fd(int index) {
+        return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET);
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
new file mode 100644
index 0000000..ac13e50
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SelectStrategy;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.util.IntSupplier;
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.IntObjectMap;
+import io.netty.util.concurrent.RejectedExecutionHandler;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static java.lang.Math.min;
+
+/**
+ * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
+ */
+final class RawSocketEventLoop extends SingleThreadEventLoop {
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RawSocketEventLoop.class);
+    private static final AtomicIntegerFieldUpdater<RawSocketEventLoop> WAKEN_UP_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(RawSocketEventLoop.class, "wakenUp");
+
+    static {
+        // Ensure JNI is initialized by the time this class is loaded by this time!
+        // We use unix-common methods in this class which are backed by JNI methods.
+        Epoll.ensureAvailability();
+    }
+
+    private final FileDescriptor epollFd;
+    private final FileDescriptor eventFd;
+    private final FileDescriptor timerFd;
+    private final IntObjectMap<AbstractRawSocketChannel> channels = new IntObjectHashMap<AbstractRawSocketChannel>(4096);
+    private final boolean allowGrowing;
+    private final RawSocketEventArray events;
+    private final IovArray iovArray = new IovArray();
+    private final SelectStrategy selectStrategy;
+    private final IntSupplier selectNowSupplier = new IntSupplier() {
+        @Override
+        public int get() throws Exception {
+            return epollWaitNow();
+        }
+    };
+    private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+            return RawSocketEventLoop.super.pendingTasks();
+        }
+    };
+    private volatile int wakenUp;
+    private volatile int ioRatio = 50;
+
+    RawSocketEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
+                       SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
+        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
+        selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
+        if (maxEvents == 0) {
+            allowGrowing = true;
+            events = new RawSocketEventArray(4096);
+        } else {
+            allowGrowing = false;
+            events = new RawSocketEventArray(maxEvents);
+        }
+        boolean success = false;
+        FileDescriptor epollFd = null;
+        FileDescriptor eventFd = null;
+        FileDescriptor timerFd = null;
+        try {
+            this.epollFd = epollFd = Native.newEpollCreate();
+            this.eventFd = eventFd = Native.newEventFd();
+            try {
+                Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
+            }
+            this.timerFd = timerFd = Native.newTimerFd();
+            try {
+                Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
+            }
+            success = true;
+        } finally {
+            if (!success) {
+                if (epollFd != null) {
+                    try {
+                        epollFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+                if (eventFd != null) {
+                    try {
+                        eventFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+                if (timerFd != null) {
+                    try {
+                        timerFd.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
+     */
+    IovArray cleanArray() {
+        iovArray.clear();
+        return iovArray;
+    }
+
+    @Override
+    protected void wakeup(boolean inEventLoop) {
+        if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
+            // write to the evfd which will then wake-up epoll_wait(...)
+            Native.eventFdWrite(eventFd.intValue(), 1L);
+        }
+    }
+
+    /**
+     * Register the given epoll with this {@link EventLoop}.
+     */
+    void add(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+        int fd = ch.socket.intValue();
+        Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
+        channels.put(fd, ch);
+    }
+
+    /**
+     * The flags of the given epoll was modified so update the registration
+     */
+    void modify(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+        Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
+    }
+
+    /**
+     * Deregister the given epoll from this {@link EventLoop}.
+     */
+    void remove(AbstractRawSocketChannel ch) throws IOException {
+        assert inEventLoop();
+
+        if (ch.isOpen()) {
+            int fd = ch.socket.intValue();
+            if (channels.remove(fd) != null) {
+                // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
+                // removed once the file-descriptor is closed.
+                Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
+            }
+        }
+    }
+
+    @Override
+    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
+        // This event loop never calls takeTask()
+        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
+                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
+    }
+
+    @Override
+    public int pendingTasks() {
+        // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
+        // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
+        // See https://github.com/netty/netty/issues/5297
+        if (inEventLoop()) {
+            return super.pendingTasks();
+        } else {
+            return submit(pendingTasksCallable).syncUninterruptibly().getNow();
+        }
+    }
+    /**
+     * Returns the percentage of the desired amount of time spent for I/O in the event loop.
+     */
+    public int getIoRatio() {
+        return ioRatio;
+    }
+
+    /**
+     * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
+     * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
+     */
+    public void setIoRatio(int ioRatio) {
+        if (ioRatio <= 0 || ioRatio > 100) {
+            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
+        }
+        this.ioRatio = ioRatio;
+    }
+
+    private int epollWait(boolean oldWakeup) throws IOException {
+        // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
+        // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
+        // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
+        // in pipeline.
+        if (oldWakeup && hasTasks()) {
+            return epollWaitNow();
+        }
+
+        long totalDelay = delayNanos(System.nanoTime());
+        int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
+        return Native.epollWait(epollFd, events, timerFd, delaySeconds,
+                (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
+    }
+
+    private int epollWaitNow() throws IOException {
+        return Native.epollWait(epollFd, events, timerFd, 0, 0);
+    }
+
+    @Override
+    protected void run() {
+        for (;;) {
+            try {
+                int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
+                switch (strategy) {
+                    case SelectStrategy.CONTINUE:
+                        continue;
+                    case SelectStrategy.SELECT:
+                        strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
+
+                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
+                        // before calling 'selector.wakeup()' to reduce the wake-up
+                        // overhead. (Selector.wakeup() is an expensive operation.)
+                        //
+                        // However, there is a race condition in this approach.
+                        // The race condition is triggered when 'wakenUp' is set to
+                        // true too early.
+                        //
+                        // 'wakenUp' is set to true too early if:
+                        // 1) Selector is waken up between 'wakenUp.set(false)' and
+                        //    'selector.select(...)'. (BAD)
+                        // 2) Selector is waken up between 'selector.select(...)' and
+                        //    'if (wakenUp.get()) { ... }'. (OK)
+                        //
+                        // In the first case, 'wakenUp' is set to true and the
+                        // following 'selector.select(...)' will wake up immediately.
+                        // Until 'wakenUp' is set to false again in the next round,
+                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
+                        // any attempt to wake up the Selector will fail, too, causing
+                        // the following 'selector.select(...)' call to block
+                        // unnecessarily.
+                        //
+                        // To fix this problem, we wake up the selector again if wakenUp
+                        // is true immediately after selector.select(...).
+                        // It is inefficient in that it wakes up the selector for both
+                        // the first case (BAD - wake-up required) and the second case
+                        // (OK - no wake-up required).
+
+                        if (wakenUp == 1) {
+                            Native.eventFdWrite(eventFd.intValue(), 1L);
+                        }
+                        // fallthrough
+                    default:
+                }
+
+                final int ioRatio = this.ioRatio;
+                if (ioRatio == 100) {
+                    try {
+                        if (strategy > 0) {
+                            processReady(events, strategy);
+                        }
+                    } finally {
+                        // Ensure we always run tasks.
+                        runAllTasks();
+                    }
+                } else {
+                    final long ioStartTime = System.nanoTime();
+
+                    try {
+                        if (strategy > 0) {
+                            processReady(events, strategy);
+                        }
+                    } finally {
+                        // Ensure we always run tasks.
+                        final long ioTime = System.nanoTime() - ioStartTime;
+                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
+                    }
+                }
+                if (allowGrowing && strategy == events.length()) {
+                    //increase the size of the array as we needed the whole space for the events
+                    events.increase();
+                }
+            } catch (Throwable t) {
+                handleLoopException(t);
+            }
+            // Always handle shutdown even if the loop processing threw an exception.
+            try {
+                if (isShuttingDown()) {
+                    closeAll();
+                    if (confirmShutdown()) {
+                        break;
+                    }
+                }
+            } catch (Throwable t) {
+                handleLoopException(t);
+            }
+        }
+    }
+
+    private static void handleLoopException(Throwable t) {
+        logger.warn("Unexpected exception in the selector loop.", t);
+
+        // Prevent possible consecutive immediate failures that lead to
+        // excessive CPU consumption.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            // Ignore.
+        }
+    }
+
+    private void closeAll() {
+        try {
+            epollWaitNow();
+        } catch (IOException ignore) {
+            // ignore on close
+        }
+        // Using the intermediate collection to prevent ConcurrentModificationException.
+        // In the `close()` method, the channel is deleted from `channels` map.
+        Collection<AbstractRawSocketChannel> array = new ArrayList<AbstractRawSocketChannel>(channels.size());
+
+        for (AbstractRawSocketChannel channel: channels.values()) {
+            array.add(channel);
+        }
+
+        for (AbstractRawSocketChannel ch: array) {
+            ch.unsafe().close(ch.unsafe().voidPromise());
+        }
+    }
+
+    private void processReady(RawSocketEventArray events, int ready) {
+        for (int i = 0; i < ready; i ++) {
+            final int fd = events.fd(i);
+            if (fd == eventFd.intValue()) {
+                // consume wakeup event.
+                Native.eventFdRead(fd);
+            } else if (fd == timerFd.intValue()) {
+                // consume wakeup event, necessary because the timer is added with ET mode.
+                Native.timerFdRead(fd);
+            } else {
+                final long ev = events.events(i);
+
+                AbstractRawSocketChannel ch = channels.get(fd);
+                if (ch != null) {
+                    // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
+                    // sure about it!
+                    // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
+                    // past.
+                    AbstractRawSocketUnsafe unsafe = (AbstractRawSocketUnsafe) ch.unsafe();
+
+                    // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
+                    // to read from the file descriptor.
+                    // See https://github.com/netty/netty/issues/3785
+                    //
+                    // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
+                    // In either case epollOutReady() will do the correct thing (finish connecting, or fail
+                    // the connection).
+                    // See https://github.com/netty/netty/issues/3848
+                    if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
+                        // Force flush of data as the epoll is writable again
+                        unsafe.epollOutReady();
+                    }
+
+                    // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
+                    // See https://github.com/netty/netty/issues/4317.
+                    //
+                    // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
+                    // try to read from the underlying file descriptor and so notify the user about the error.
+                    if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
+                        // The Channel is still open and there is something to read. Do it now.
+                        unsafe.epollInReady();
+                    }
+
+                    // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
+                    // we may close the channel directly or try to read more data depending on the state of the
+                    // Channel and als depending on the AbstractEpollChannel subtype.
+                    if ((ev & Native.EPOLLRDHUP) != 0) {
+                        unsafe.epollRdHupReady();
+                    }
+                } else {
+                    // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
+                    try {
+                        Native.epollCtlDel(epollFd.intValue(), fd);
+                    } catch (IOException ignore) {
+                        // This can happen but is nothing we need to worry about as we only try to delete
+                        // the fd from the epoll set as we not found it in our mappings. So this call to
+                        // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
+                        // deleted before or the file descriptor was closed before.
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup() {
+        try {
+            try {
+                epollFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the epoll fd.", e);
+            }
+            try {
+                eventFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the event fd.", e);
+            }
+            try {
+                timerFd.close();
+            } catch (IOException e) {
+                logger.warn("Failed to close the timer fd.", e);
+            }
+        } finally {
+            // release native memory
+            iovArray.release();
+            events.free();
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.

[incubator-plc4x] 04/04: Merge branch 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 9267533494e54ae1e279fa82988ef9e2c0f63efe
Merge: cbcd7e4 25a752c
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Jan 9 16:09:02 2018 +0100

    Merge branch 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets

 Jenkinsfile                                        |   4 +-
 applications/iotree/pom.xml                        |   5 +
 .../plc4x/java/applications/iotree/IoTree.java     |  20 +-
 applications/plclogger/pom.xml                     |   5 +
 .../java/applications/plclogger/PlcLogger.java     |  20 +-
 integrations/apache-camel/pom.xml                  |  23 +-
 .../java/org/apache/plc4x/camel/PLC4XProducer.java |   2 +
 .../apache/plc4x/edgent/PlcConnectionAdapter.java  |   8 +-
 .../java/api/connection/AbstractPlcConnection.java |   4 -
 .../PlcUsernamePasswordAuthenticationTest.java     |   0
 .../plc4x/java/api/messages/APIMessageTests.java   |   0
 .../plc4x/java/api/messages/mock/MockAddress.java  |   0
 plc4j/pom.xml                                      |  38 +--
 .../plc4x/java/isotp/netty/IsoTPProtocol.java      | 369 +++++++++++----------
 .../java/isotp/netty/model/types/DeviceGroup.java  |   4 +-
 .../isotp/netty/model/types/DisconnectReason.java  |   4 +-
 .../isotp/netty/model/types/ParameterCode.java     |   4 +-
 .../isotp/netty/model/types/ProtocolClass.java     |   4 +-
 .../java/isotp/netty/model/types/RejectCause.java  |   4 +-
 .../java/isotp/netty/model/types/TpduCode.java     |   4 +-
 .../java/isotp/netty/model/types/TpduSize.java     |   4 +-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   6 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |  73 ++--
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java | 231 +++++++------
 .../netty/model/types/DataTransportErrorCode.java  |   4 +-
 .../s7/netty/model/types/DataTransportSize.java    |   4 +-
 .../java/s7/netty/model/types/MemoryArea.java      |   4 +-
 .../java/s7/netty/model/types/MessageType.java     |   4 +-
 .../java/s7/netty/model/types/ParameterType.java   |   4 +-
 .../s7/netty/model/types/SpecificationType.java    |   4 +-
 .../java/s7/netty/model/types/TransportSize.java   |   4 +-
 .../netty/model/types/VariableAddressingMode.java  |   4 +-
 .../plc4x/java/isotp/netty/IsoTPProtocolTest.java  | 294 +++++++++++++++-
 .../apache/plc4x/java/isotp/netty/MockChannel.java |  18 +
 .../isotp/netty/MockChannelHandlerContext.java     |  18 +
 .../java/isotp/netty/MockChannelPipeline.java      |  18 +
 .../apache/plc4x/java/s7/S7PlcReaderSample.java    |   2 +-
 .../plc4x/java/s7/netty/Plc4XS7ProtocolTest.java   | 205 +++++++++++-
 .../apache/plc4x/java/s7/netty/S7ProtocolTest.java |  62 +++-
 pom.xml                                            | 263 +++++++++------
 ...ject.properties => sonar-project.properties.sav |  17 +
 41 files changed, 1237 insertions(+), 528 deletions(-)


-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.

[incubator-plc4x] 02/04: PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 03ba3ce1b1819bcdab17eaf050c1db1f3908f686
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Thu Jan 4 15:37:35 2018 +0100

    PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP
    
    - Initial commit of this effort
    - Added a new module hierarchy for utility modules
---
 plc4j/utils/wireshark-utils/pom.xml | 40 +++++++++++++++++++++++++++++++++++++
 1 file changed, 40 insertions(+)

diff --git a/plc4j/utils/wireshark-utils/pom.xml b/plc4j/utils/wireshark-utils/pom.xml
new file mode 100644
index 0000000..042c91a
--- /dev/null
+++ b/plc4j/utils/wireshark-utils/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.plc4x</groupId>
+    <artifactId>plc4j-uitils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>plc4j-utils-wireshark-utils</artifactId>
+  <version>0.0.1-SNAPSHOT</version>
+
+  <name>PLC4J: Utils: WireShark Utils</name>
+  <description>A set of helper utilities that allow reading and writing of `pcapng` files so they can be inspected with WireShark.</description>
+
+  <dependencies>
+  </dependencies>
+
+</project>
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.

[incubator-plc4x] 01/04: Merge branches 'feature/PLC4X-18--raw-sockets' and 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit a980d6130c650d23a2cae120ec553bcded5b8f7b
Merge: 386c49d fe4b740
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Thu Jan 4 08:31:53 2018 +0100

    Merge branches 'feature/PLC4X-18--raw-sockets' and 'master' of https://gitbox.apache.org/repos/asf/incubator-plc4x into feature/PLC4X-18--raw-sockets

 .gitignore                                         |  1 +
 .../apache/plc4x/edgent/PlcConnectionAdapter.java  |  2 +-
 .../plc4x/java/isotp/netty/IsoTPProtocol.java      | 14 +++++++++-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |  2 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |  2 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java |  2 +-
 .../plc4x/java/isotp/netty/IsoTPProtocolTest.java  | 30 ++++++++++++++++++++++
 .../apache/plc4x/java/s7/S7PlcReaderSample.java    |  2 +-
 .../org/apache/plc4x/java/s7/S7PlcScanner.java     |  2 +-
 .../org/apache/plc4x/java/s7/S7PlcTestConsole.java |  2 +-
 .../apache/plc4x/java/s7/S7PlcWriterSample.java    |  2 +-
 sonar-project.properties                           | 29 +++++++++++++++++++++
 12 files changed, 81 insertions(+), 9 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.