You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/01/09 15:09:23 UTC
[incubator-plc4x] 03/04: PLC4X-18 - Implement a Netty Pipeline that
allows creating pipelines for low level protocols below TCP and UDP -
Initial commit of this effort - Added a new module hierarchy for utility
modules
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/PLC4X-18--raw-sockets
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit cbcd7e4e8562fbd99d10b11e98bc59339d1fd15a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Jan 5 09:18:15 2018 +0100
PLC4X-18 - Implement a Netty Pipeline that allows creating pipelines for low level protocols below TCP and UDP - Initial commit of this effort - Added a new module hierarchy for utility modules
---
plc4j/pom.xml | 2 +-
.../rawsockets/netty/AbstractRawSocketChannel.java | 740 ++++++++++++++
.../netty/AbstractRawSocketStreamChannel.java | 1042 ++++++++++++++++++++
.../utils/rawsockets/netty/RawSocketChannel.java | 214 ++--
.../rawsockets/netty/RawSocketChannelConfig.java | 187 ++++
...SocketChannel.java => RawSocketChannelSav.java} | 4 +-
.../rawsockets/netty/RawSocketEventArray.java | 104 ++
.../utils/rawsockets/netty/RawSocketEventLoop.java | 449 +++++++++
8 files changed, 2598 insertions(+), 144 deletions(-)
diff --git a/plc4j/pom.xml b/plc4j/pom.xml
index e9932cb..62d0025 100644
--- a/plc4j/pom.xml
+++ b/plc4j/pom.xml
@@ -39,7 +39,7 @@
<module>api</module>
<module>core</module>
<module>protocols</module>
- <module>utils</module>
+ <!--module>utils</module-->
</modules>
<dependencies>
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
new file mode 100644
index 0000000..a8762ad
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketChannel.java
@@ -0,0 +1,740 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.channel.socket.ChannelInputShutdownReadComplete;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.Socket;
+import io.netty.channel.unix.UnixChannel;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ThrowableUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+abstract class AbstractRawSocketChannel extends AbstractChannel implements Channel {
+ private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+ new ClosedChannelException(), AbstractRawSocketChannel.class, "doClose()");
+ private static final ChannelMetadata METADATA = new ChannelMetadata(false);
+ private final int readFlag;
+ final LinuxSocket socket;
+ /**
+ * The future of the current connection attempt. If not null, subsequent
+ * connection attempts will fail.
+ */
+ private ChannelPromise connectPromise;
+ private ScheduledFuture<?> connectTimeoutFuture;
+ private SocketAddress requestedRemoteAddress;
+
+ private volatile SocketAddress local;
+ private volatile SocketAddress remote;
+
+ protected int flags = Native.EPOLLET;
+ boolean inputClosedSeenErrorOnRead;
+ boolean epollInReadyRunnablePending;
+
+ protected volatile boolean active;
+
+ AbstractRawSocketChannel(LinuxSocket fd, int flag) {
+ this(null, fd, flag, false);
+ }
+
+ AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, boolean active) {
+ super(parent);
+ socket = checkNotNull(fd, "fd");
+ readFlag = flag;
+ flags |= flag;
+ this.active = active;
+ if (active) {
+ // Directly cache the remote and local addresses
+ // See https://github.com/netty/netty/issues/2359
+ local = fd.localAddress();
+ remote = fd.remoteAddress();
+ }
+ }
+
+ AbstractRawSocketChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) {
+ super(parent);
+ socket = checkNotNull(fd, "fd");
+ readFlag = flag;
+ flags |= flag;
+ active = true;
+ // Directly cache the remote and local addresses
+ // See https://github.com/netty/netty/issues/2359
+ this.remote = remote;
+ local = fd.localAddress();
+ }
+
+ static boolean isSoErrorZero(Socket fd) {
+ try {
+ return fd.getSoError() == 0;
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+
+ void setFlag(int flag) throws IOException {
+ if (!isFlagSet(flag)) {
+ flags |= flag;
+ modifyEvents();
+ }
+ }
+
+ void clearFlag(int flag) throws IOException {
+ if (isFlagSet(flag)) {
+ flags &= ~flag;
+ modifyEvents();
+ }
+ }
+
+ boolean isFlagSet(int flag) {
+ return (flags & flag) != 0;
+ }
+
+ @Override
+ public final FileDescriptor fd() {
+ return socket;
+ }
+
+ @Override
+ public abstract RawSocketChannelConfig config();
+
+ @Override
+ public boolean isActive() {
+ return active;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return METADATA;
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ active = false;
+ // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
+ // socket which has not even been connected yet. This has been observed to block during unit tests.
+ inputClosedSeenErrorOnRead = true;
+ try {
+ ChannelPromise promise = connectPromise;
+ if (promise != null) {
+ // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+ promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
+ connectPromise = null;
+ }
+
+ ScheduledFuture<?> future = connectTimeoutFuture;
+ if (future != null) {
+ future.cancel(false);
+ connectTimeoutFuture = null;
+ }
+
+ if (isRegistered()) {
+ // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
+ // if SO_LINGER is used.
+ //
+ // See https://github.com/netty/netty/issues/7159
+ EventLoop loop = eventLoop();
+ if (loop.inEventLoop()) {
+ doDeregister();
+ } else {
+ loop.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doDeregister();
+ } catch (Throwable cause) {
+ pipeline().fireExceptionCaught(cause);
+ }
+ }
+ });
+ }
+ }
+ } finally {
+ socket.close();
+ }
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ doClose();
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop loop) {
+ return loop instanceof RawSocketEventLoop;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return socket.isOpen();
+ }
+
+ @Override
+ protected void doDeregister() throws Exception {
+ ((RawSocketEventLoop) eventLoop()).remove(this);
+ }
+
+ @Override
+ protected final void doBeginRead() throws Exception {
+ // Channel.read() or ChannelHandlerContext.read() was called
+ final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+ unsafe.readPending = true;
+
+ // We must set the read flag here as it is possible the user didn't read in the last read loop, the
+ // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
+ // never get data after this.
+ setFlag(readFlag);
+
+ // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
+ // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
+ if (unsafe.maybeMoreDataToRead) {
+ unsafe.executeEpollInReadyRunnable(config());
+ }
+ }
+
+ final boolean shouldBreakEpollInReady(ChannelConfig config) {
+ return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
+ }
+
+ private static boolean isAllowHalfClosure(ChannelConfig config) {
+ return config instanceof RawSocketSocketChannelConfig &&
+ ((RawSocketSocketChannelConfig) config).isAllowHalfClosure();
+ }
+
+ final void clearEpollIn() {
+ // Only clear if registered with an EventLoop as otherwise
+ if (isRegistered()) {
+ final EventLoop loop = eventLoop();
+ final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+ if (loop.inEventLoop()) {
+ unsafe.clearEpollIn0();
+ } else {
+ // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
+ loop.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (!unsafe.readPending && !config().isAutoRead()) {
+ // Still no read triggered so clear it now
+ unsafe.clearEpollIn0();
+ }
+ }
+ });
+ }
+ } else {
+ // The EventLoop is not registered atm so just update the flags so the correct value
+ // will be used once the channel is registered
+ flags &= ~readFlag;
+ }
+ }
+
+ private void modifyEvents() throws IOException {
+ if (isOpen() && isRegistered()) {
+ ((RawSocketEventLoop) eventLoop()).modify(this);
+ }
+ }
+
+ @Override
+ protected void doRegister() throws Exception {
+ // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
+ // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
+ // new EventLoop.
+ epollInReadyRunnablePending = false;
+ ((RawSocketEventLoop) eventLoop()).add(this);
+ }
+
+ @Override
+ protected abstract AbstractEpollUnsafe newUnsafe();
+
+ /**
+ * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
+ */
+ protected final ByteBuf newDirectBuffer(ByteBuf buf) {
+ return newDirectBuffer(buf, buf);
+ }
+
+ /**
+ * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
+ * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
+ * this method.
+ */
+ protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
+ final int readableBytes = buf.readableBytes();
+ if (readableBytes == 0) {
+ ReferenceCountUtil.release(holder);
+ return Unpooled.EMPTY_BUFFER;
+ }
+
+ final ByteBufAllocator alloc = alloc();
+ if (alloc.isDirectBufferPooled()) {
+ return newDirectBuffer0(holder, buf, alloc, readableBytes);
+ }
+
+ final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
+ if (directBuf == null) {
+ return newDirectBuffer0(holder, buf, alloc, readableBytes);
+ }
+
+ directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
+ ReferenceCountUtil.safeRelease(holder);
+ return directBuf;
+ }
+
+ private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
+ final ByteBuf directBuf = alloc.directBuffer(capacity);
+ directBuf.writeBytes(buf, buf.readerIndex(), capacity);
+ ReferenceCountUtil.safeRelease(holder);
+ return directBuf;
+ }
+
+ protected static void checkResolvable(InetSocketAddress addr) {
+ if (addr.isUnresolved()) {
+ throw new UnresolvedAddressException();
+ }
+ }
+
+ /**
+ * Read bytes into the given {@link ByteBuf} and return the amount.
+ */
+ protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
+ int writerIndex = byteBuf.writerIndex();
+ int localReadAmount;
+ unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
+ if (byteBuf.hasMemoryAddress()) {
+ localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
+ } else {
+ ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
+ localReadAmount = socket.read(buf, buf.position(), buf.limit());
+ }
+ if (localReadAmount > 0) {
+ byteBuf.writerIndex(writerIndex + localReadAmount);
+ }
+ return localReadAmount;
+ }
+
+ protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+ if (buf.hasMemoryAddress()) {
+ int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
+ if (localFlushedAmount > 0) {
+ in.removeBytes(localFlushedAmount);
+ return 1;
+ }
+ } else {
+ final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
+ buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
+ int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
+ if (localFlushedAmount > 0) {
+ nioBuf.position(nioBuf.position() + localFlushedAmount);
+ in.removeBytes(localFlushedAmount);
+ return 1;
+ }
+ }
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+
+ protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
+ boolean readPending;
+ boolean maybeMoreDataToRead;
+ private RawSocketRecvByteAllocatorHandle allocHandle;
+ private final Runnable epollInReadyRunnable = new Runnable() {
+ @Override
+ public void run() {
+ epollInReadyRunnablePending = false;
+ epollInReady();
+ }
+ };
+
+ /**
+ * Called once EPOLLIN event is ready to be processed
+ */
+ abstract void epollInReady();
+
+ final void epollInBefore() { maybeMoreDataToRead = false; }
+
+ final void epollInFinally(ChannelConfig config) {
+ maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
+ // Check if there is a readPending which was not processed yet.
+ // This could be for two reasons:
+ // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
+ // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
+ //
+ // See https://github.com/netty/netty/issues/2254
+ if (!readPending && !config.isAutoRead()) {
+ clearEpollIn();
+ } else if (readPending && maybeMoreDataToRead) {
+ // trigger a read again as there may be something left to read and because of epoll ET we
+ // will not get notified again until we read everything from the socket
+ //
+ // It is possible the last fireChannelRead call could cause the user to call read() again, or if
+ // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
+ // to false before every read operation to prevent re-entry into epollInReady() we will not read from
+ // the underlying OS again unless the user happens to call read again.
+ executeEpollInReadyRunnable(config);
+ }
+ }
+
+ final void executeEpollInReadyRunnable(ChannelConfig config) {
+ if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
+ return;
+ }
+ epollInReadyRunnablePending = true;
+ eventLoop().execute(epollInReadyRunnable);
+ }
+
+ /**
+ * Called once EPOLLRDHUP event is ready to be processed
+ */
+ final void epollRdHupReady() {
+ // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
+ recvBufAllocHandle().receivedRdHup();
+
+ if (isActive()) {
+ // If it is still active, we need to call epollInReady as otherwise we may miss to
+ // read pending data from the underlying file descriptor.
+ // See https://github.com/netty/netty/issues/3709
+ epollInReady();
+ } else {
+ // Just to be safe make sure the input marked as closed.
+ shutdownInput(true);
+ }
+
+ // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
+ clearEpollRdHup();
+ }
+
+ /**
+ * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
+ */
+ private void clearEpollRdHup() {
+ try {
+ clearFlag(Native.EPOLLRDHUP);
+ } catch (IOException e) {
+ pipeline().fireExceptionCaught(e);
+ close(voidPromise());
+ }
+ }
+
+ /**
+ * Shutdown the input side of the channel.
+ */
+ void shutdownInput(boolean rdHup) {
+ if (!socket.isInputShutdown()) {
+ if (isAllowHalfClosure(config())) {
+ try {
+ socket.shutdown(true, false);
+ } catch (IOException ignored) {
+ // We attempted to shutdown and failed, which means the input has already effectively been
+ // shutdown.
+ fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
+ return;
+ } catch (NotYetConnectedException ignore) {
+ // We attempted to shutdown and failed, which means the input has already effectively been
+ // shutdown.
+ }
+ clearEpollIn();
+ pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
+ } else {
+ close(voidPromise());
+ }
+ } else if (!rdHup) {
+ inputClosedSeenErrorOnRead = true;
+ pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
+ }
+ }
+
+ private void fireEventAndClose(Object evt) {
+ pipeline().fireUserEventTriggered(evt);
+ close(voidPromise());
+ }
+
+ @Override
+ public RawSocketRecvByteAllocatorHandle recvBufAllocHandle() {
+ if (allocHandle == null) {
+ allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
+ }
+ return allocHandle;
+ }
+
+ /**
+ * Create a new {@link RawSocketRecvByteAllocatorHandle} instance.
+ * @param handle The handle to wrap with EPOLL specific logic.
+ */
+ RawSocketRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+ return new RawSocketRecvByteAllocatorHandle(handle);
+ }
+
+ @Override
+ protected void flush0() {
+ // Flush immediately only when there's no pending flush.
+ // If there's a pending flush operation, event loop will call forceFlush() later,
+ // and thus there's no need to call it now.
+ if (isFlagSet(Native.EPOLLOUT)) {
+ return;
+ }
+ super.flush0();
+ }
+
+ /**
+ * Called once a EPOLLOUT event is ready to be processed
+ */
+ final void epollOutReady() {
+ if (connectPromise != null) {
+ // pending connect which is now complete so handle it.
+ finishConnect();
+ } else if (!socket.isOutputShutdown()) {
+ // directly call super.flush0() to force a flush now
+ super.flush0();
+ }
+ }
+
+ protected final void clearEpollIn0() {
+ assert eventLoop().inEventLoop();
+ try {
+ readPending = false;
+ clearFlag(readFlag);
+ } catch (IOException e) {
+ // When this happens there is something completely wrong with either the filedescriptor or epoll,
+ // so fire the exception through the pipeline and close the Channel.
+ pipeline().fireExceptionCaught(e);
+ unsafe().close(unsafe().voidPromise());
+ }
+ }
+
+ @Override
+ public void connect(
+ final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
+ if (!promise.setUncancellable() || !ensureOpen(promise)) {
+ return;
+ }
+
+ try {
+ if (connectPromise != null) {
+ throw new ConnectionPendingException();
+ }
+
+ boolean wasActive = isActive();
+ if (doConnect(remoteAddress, localAddress)) {
+ fulfillConnectPromise(promise, wasActive);
+ } else {
+ connectPromise = promise;
+ requestedRemoteAddress = remoteAddress;
+
+ // Schedule connect timeout.
+ int connectTimeoutMillis = config().getConnectTimeoutMillis();
+ if (connectTimeoutMillis > 0) {
+ connectTimeoutFuture = eventLoop().schedule(new Runnable() {
+ @Override
+ public void run() {
+ ChannelPromise connectPromise = AbstractRawSocketChannel.this.connectPromise;
+ ConnectTimeoutException cause =
+ new ConnectTimeoutException("connection timed out: " + remoteAddress);
+ if (connectPromise != null && connectPromise.tryFailure(cause)) {
+ close(voidPromise());
+ }
+ }
+ }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ promise.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isCancelled()) {
+ if (connectTimeoutFuture != null) {
+ connectTimeoutFuture.cancel(false);
+ }
+ connectPromise = null;
+ close(voidPromise());
+ }
+ }
+ });
+ }
+ } catch (Throwable t) {
+ closeIfClosed();
+ promise.tryFailure(annotateConnectException(t, remoteAddress));
+ }
+ }
+
+ private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
+ if (promise == null) {
+ // Closed via cancellation and the promise has been notified already.
+ return;
+ }
+ active = true;
+
+ // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
+ // We still need to ensure we call fireChannelActive() in this case.
+ boolean active = isActive();
+
+ // trySuccess() will return false if a user cancelled the connection attempt.
+ boolean promiseSet = promise.trySuccess();
+
+ // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
+ // because what happened is what happened.
+ if (!wasActive && active) {
+ pipeline().fireChannelActive();
+ }
+
+ // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
+ if (!promiseSet) {
+ close(voidPromise());
+ }
+ }
+
+ private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
+ if (promise == null) {
+ // Closed via cancellation and the promise has been notified already.
+ return;
+ }
+
+ // Use tryFailure() instead of setFailure() to avoid the race against cancel().
+ promise.tryFailure(cause);
+ closeIfClosed();
+ }
+
+ private void finishConnect() {
+ // Note this method is invoked by the event loop only if the connection attempt was
+ // neither cancelled nor timed out.
+
+ assert eventLoop().inEventLoop();
+
+ boolean connectStillInProgress = false;
+ try {
+ boolean wasActive = isActive();
+ if (!doFinishConnect()) {
+ connectStillInProgress = true;
+ return;
+ }
+ fulfillConnectPromise(connectPromise, wasActive);
+ } catch (Throwable t) {
+ fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
+ } finally {
+ if (!connectStillInProgress) {
+ // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
+ // See https://github.com/netty/netty/issues/1770
+ if (connectTimeoutFuture != null) {
+ connectTimeoutFuture.cancel(false);
+ }
+ connectPromise = null;
+ }
+ }
+ }
+
+ /**
+ * Finish the connect
+ */
+ private boolean doFinishConnect() throws Exception {
+ if (socket.finishConnect()) {
+ clearFlag(Native.EPOLLOUT);
+ if (requestedRemoteAddress instanceof InetSocketAddress) {
+ remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
+ }
+ requestedRemoteAddress = null;
+
+ return true;
+ }
+ setFlag(Native.EPOLLOUT);
+ return false;
+ }
+ }
+
+ @Override
+ protected void doBind(SocketAddress local) throws Exception {
+ if (local instanceof InetSocketAddress) {
+ checkResolvable((InetSocketAddress) local);
+ }
+ socket.bind(local);
+ this.local = socket.localAddress();
+ }
+
+ /**
+ * Connect to the remote peer
+ */
+ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+ if (localAddress instanceof InetSocketAddress) {
+ checkResolvable((InetSocketAddress) localAddress);
+ }
+
+ InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
+ ? (InetSocketAddress) remoteAddress : null;
+ if (remoteSocketAddr != null) {
+ checkResolvable(remoteSocketAddr);
+ }
+
+ if (remote != null) {
+ // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
+ // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
+ // later.
+ throw new AlreadyConnectedException();
+ }
+
+ if (localAddress != null) {
+ socket.bind(localAddress);
+ }
+
+ boolean connected = doConnect0(remoteAddress);
+ if (connected) {
+ remote = remoteSocketAddr == null ?
+ remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
+ }
+ // We always need to set the localAddress even if not connected yet as the bind already took place.
+ //
+ // See https://github.com/netty/netty/issues/3463
+ local = socket.localAddress();
+ return connected;
+ }
+
+ private boolean doConnect0(SocketAddress remote) throws Exception {
+ boolean success = false;
+ try {
+ boolean connected = socket.connect(remote);
+ if (!connected) {
+ setFlag(Native.EPOLLOUT);
+ }
+ success = true;
+ return connected;
+ } finally {
+ if (!success) {
+ doClose();
+ }
+ }
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return local;
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return remote;
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
new file mode 100644
index 0000000..a1f2ff4
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/AbstractRawSocketStreamChannel.java
@@ -0,0 +1,1042 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+import io.netty.channel.internal.ChannelUtils;
+import io.netty.channel.socket.DuplexChannel;
+import io.netty.channel.unix.FileDescriptor;
+import io.netty.channel.unix.IovArray;
+import io.netty.channel.unix.SocketWritableByteChannel;
+import io.netty.channel.unix.UnixChannelUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.StringUtil;
+import io.netty.util.internal.ThrowableUtil;
+import io.netty.util.internal.UnstableApi;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
+import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
+import static io.netty.channel.unix.FileDescriptor.pipe;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+public abstract class AbstractRawSocketStreamChannel extends AbstractRawSocketChannel implements DuplexChannel {
+ private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
+ private static final String EXPECTED_TYPES =
+ " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
+ StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRawSocketStreamChannel.class);
+ private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
+ ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+ AbstractRawSocketStreamChannel.class, "clearSpliceQueue()");
+ private static final ClosedChannelException SPLICE_TO_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+ new ClosedChannelException(),
+ AbstractRawSocketStreamChannel.class, "spliceTo(...)");
+ private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
+ ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
+ AbstractRawSocketStreamChannel.class, "failSpliceIfClosed(...)");
+ private final Runnable flushTask = new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ };
+ private Queue<SpliceInTask> spliceQueue;
+
+ // Lazy init these if we need to splice(...)
+ private FileDescriptor pipeIn;
+ private FileDescriptor pipeOut;
+
+ private WritableByteChannel byteChannel;
+
+ protected AbstractRawSocketStreamChannel(Channel parent, int fd) {
+ this(parent, new LinuxSocket(fd));
+ }
+
+ protected AbstractRawSocketStreamChannel(int fd) {
+ this(new LinuxSocket(fd));
+ }
+
+ AbstractRawSocketStreamChannel(LinuxSocket fd) {
+ this(fd, isSoErrorZero(fd));
+ }
+
+ AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd) {
+ super(parent, fd, Native.EPOLLIN, true);
+ // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+ flags |= Native.EPOLLRDHUP;
+ }
+
+ AbstractRawSocketStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
+ super(parent, fd, Native.EPOLLIN, remote);
+ // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+ flags |= Native.EPOLLRDHUP;
+ }
+
+ protected AbstractRawSocketStreamChannel(LinuxSocket fd, boolean active) {
+ super(null, fd, Native.EPOLLIN, active);
+ // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
+ flags |= Native.EPOLLRDHUP;
+ }
+
+ @Override
+ protected AbstractEpollUnsafe newUnsafe() {
+ return new EpollStreamUnsafe();
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return METADATA;
+ }
+
+ /**
+ * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+ * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+ * splice until the {@link ChannelFuture} was canceled or it was failed.
+ *
+ * Please note:
+ * <ul>
+ * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+ * {@link IllegalArgumentException} is thrown. </li>
+ * <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+ * target {@link AbstractRawSocketStreamChannel}</li>
+ * </ul>
+ *
+ */
+ public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len) {
+ return spliceTo(ch, len, newPromise());
+ }
+
+ /**
+ * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link AbstractRawSocketStreamChannel}.
+ * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
+ * splice until the {@link ChannelFuture} was canceled or it was failed.
+ *
+ * Please note:
+ * <ul>
+ * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
+ * {@link IllegalArgumentException} is thrown. </li>
+ * <li>{@link RawSocketChannelConfig#getEpollMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this and the
+ * target {@link AbstractRawSocketStreamChannel}</li>
+ * </ul>
+ *
+ */
+ public final ChannelFuture spliceTo(final AbstractRawSocketStreamChannel ch, final int len,
+ final ChannelPromise promise) {
+ if (ch.eventLoop() != eventLoop()) {
+ throw new IllegalArgumentException("EventLoops are not the same.");
+ }
+ if (len < 0) {
+ throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+ }
+ if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
+ || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
+ throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
+ }
+ checkNotNull(promise, "promise");
+ if (!isOpen()) {
+ promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+ } else {
+ addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
+ failSpliceIfClosed(promise);
+ }
+ return promise;
+ }
+
+ /**
+ * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+ * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+ * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+ * {@link ChannelFuture} was canceled or it was failed.
+ *
+ * Please note:
+ * <ul>
+ * <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+ * {@link AbstractRawSocketStreamChannel}</li>
+ * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
+ * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+ * </ul>
+ */
+ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
+ return spliceTo(ch, offset, len, newPromise());
+ }
+
+ /**
+ * Splice from this {@link AbstractRawSocketStreamChannel} to another {@link FileDescriptor}.
+ * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
+ * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
+ * {@link ChannelFuture} was canceled or it was failed.
+ *
+ * Please note:
+ * <ul>
+ * <li>{@link RawSocketChannelConfig#getRawSocketMode()} must be {@link RawSocketMode#LEVEL_TRIGGERED} for this
+ * {@link AbstractRawSocketStreamChannel}</li>
+ * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
+ * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
+ * </ul>
+ */
+ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
+ final ChannelPromise promise) {
+ if (len < 0) {
+ throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
+ }
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
+ }
+ if (config().getRawSocketMode() != RawSocketMode.LEVEL_TRIGGERED) {
+ throw new IllegalStateException("spliceTo() supported only when using " + RawSocketMode.LEVEL_TRIGGERED);
+ }
+ checkNotNull(promise, "promise");
+ if (!isOpen()) {
+ promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
+ } else {
+ addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
+ failSpliceIfClosed(promise);
+ }
+ return promise;
+ }
+
+ private void failSpliceIfClosed(ChannelPromise promise) {
+ if (!isOpen()) {
+ // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
+ // cases where a future may not be notified otherwise.
+ if (promise.tryFailure(FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION)) {
+ eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ // Call this via the EventLoop as it is a MPSC queue.
+ clearSpliceQueue();
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
+ * @param in the collection which contains objects to write.
+ * @param buf the {@link ByteBuf} from which the bytes should be written
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ */
+ private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
+ int readableBytes = buf.readableBytes();
+ if (readableBytes == 0) {
+ in.remove();
+ return 0;
+ }
+
+ if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
+ return doWriteBytes(in, buf);
+ } else {
+ ByteBuffer[] nioBuffers = buf.nioBuffers();
+ return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
+ config().getMaxBytesPerGatheringWrite());
+ }
+ }
+
+ private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
+ // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
+ // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
+ // make a best effort to adjust as OS behavior changes.
+ if (attempted == written) {
+ if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
+ config().setMaxBytesPerGatheringWrite(attempted << 1);
+ }
+ } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
+ config().setMaxBytesPerGatheringWrite(attempted >>> 1);
+ }
+ }
+
+ /**
+ * Write multiple bytes via {@link IovArray}.
+ * @param in the collection which contains objects to write.
+ * @param array The array which contains the content to write.
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ * @throws IOException If an I/O exception occurs during write.
+ */
+ private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
+ final long expectedWrittenBytes = array.size();
+ assert expectedWrittenBytes != 0;
+ final int cnt = array.count();
+ assert cnt != 0;
+
+ final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
+ if (localWrittenBytes > 0) {
+ adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
+ in.removeBytes(localWrittenBytes);
+ return 1;
+ }
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+
+ /**
+ * Write multiple bytes via {@link ByteBuffer} array.
+ * @param in the collection which contains objects to write.
+ * @param nioBuffers The buffers to write.
+ * @param nioBufferCnt The number of buffers to write.
+ * @param expectedWrittenBytes The number of bytes we expect to write.
+ * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ * @throws IOException If an I/O exception occurs during write.
+ */
+ private int writeBytesMultiple(
+ ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
+ long maxBytesPerGatheringWrite) throws IOException {
+ assert expectedWrittenBytes != 0;
+ if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
+ expectedWrittenBytes = maxBytesPerGatheringWrite;
+ }
+
+ final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
+ if (localWrittenBytes > 0) {
+ adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
+ in.removeBytes(localWrittenBytes);
+ return 1;
+ }
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+
+ /**
+ * Write a {@link DefaultFileRegion}
+ * @param in the collection which contains objects to write.
+ * @param region the {@link DefaultFileRegion} from which the bytes should be written
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ */
+ private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
+ final long regionCount = region.count();
+ if (region.transferred() >= regionCount) {
+ in.remove();
+ return 0;
+ }
+
+ final long offset = region.transferred();
+ final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
+ if (flushedAmount > 0) {
+ in.progress(flushedAmount);
+ if (region.transferred() >= regionCount) {
+ in.remove();
+ }
+ return 1;
+ }
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+
+ /**
+ * Write a {@link FileRegion}
+ * @param in the collection which contains objects to write.
+ * @param region the {@link FileRegion} from which the bytes should be written
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ */
+ private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
+ if (region.transferred() >= region.count()) {
+ in.remove();
+ return 0;
+ }
+
+ if (byteChannel == null) {
+ byteChannel = new RawSocketSocketWritableByteChannel();
+ }
+ final long flushedAmount = region.transferTo(byteChannel, region.transferred());
+ if (flushedAmount > 0) {
+ in.progress(flushedAmount);
+ if (region.transferred() >= region.count()) {
+ in.remove();
+ }
+ return 1;
+ }
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+
+ @Override
+ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+ int writeSpinCount = config().getWriteSpinCount();
+ do {
+ final int msgCount = in.size();
+ // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
+ if (msgCount > 1 && in.current() instanceof ByteBuf) {
+ writeSpinCount -= doWriteMultiple(in);
+ } else if (msgCount == 0) {
+ // Wrote all messages.
+ clearFlag(Native.EPOLLOUT);
+ // Return here so we not set the EPOLLOUT flag.
+ return;
+ } else { // msgCount == 1
+ writeSpinCount -= doWriteSingle(in);
+ }
+
+ // We do not break the loop here even if the outbound buffer was flushed completely,
+ // because a user might have triggered another write and flush when we notify his or her
+ // listeners.
+ } while (writeSpinCount > 0);
+
+ if (writeSpinCount == 0) {
+ // We used our writeSpin quantum, and should try to write again later.
+ eventLoop().execute(flushTask);
+ } else {
+ // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
+ // when it can accept more data.
+ setFlag(Native.EPOLLOUT);
+ }
+ }
+
+ /**
+ * Attempt to write a single object.
+ * @param in the collection which contains objects to write.
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ * @throws Exception If an I/O error occurs.
+ */
+ protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
+ // The outbound buffer contains only one message or it contains a file region.
+ Object msg = in.current();
+ if (msg instanceof ByteBuf) {
+ return writeBytes(in, (ByteBuf) msg);
+ } else if (msg instanceof DefaultFileRegion) {
+ return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
+ } else if (msg instanceof FileRegion) {
+ return writeFileRegion(in, (FileRegion) msg);
+ } else if (msg instanceof SpliceOutTask) {
+ if (!((SpliceOutTask) msg).spliceOut()) {
+ return WRITE_STATUS_SNDBUF_FULL;
+ }
+ in.remove();
+ return 1;
+ } else {
+ // Should never reach here.
+ throw new Error();
+ }
+ }
+
+ /**
+ * Attempt to write multiple {@link ByteBuf} objects.
+ * @param in the collection which contains objects to write.
+ * @return The value that should be decremented from the write quantum which starts at
+ * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
+ * <ul>
+ * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
+ * is encountered</li>
+ * <li>1 - if a single call to write data was made to the OS</li>
+ * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
+ * no data was accepted</li>
+ * </ul>
+ * @throws Exception If an I/O error occurs.
+ */
+ private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
+ final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
+ if (PlatformDependent.hasUnsafe()) {
+ IovArray array = ((RawSocketEventLoop) eventLoop()).cleanArray();
+ array.maxBytes(maxBytesPerGatheringWrite);
+ in.forEachFlushedMessage(array);
+
+ if (array.count() >= 1) {
+ // TODO: Handle the case where cnt == 1 specially.
+ return writeBytesMultiple(in, array);
+ }
+ } else {
+ ByteBuffer[] buffers = in.nioBuffers();
+ int cnt = in.nioBufferCount();
+ if (cnt >= 1) {
+ // TODO: Handle the case where cnt == 1 specially.
+ return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
+ }
+ }
+ // cnt == 0, which means the outbound buffer contained empty buffers only.
+ in.removeBytes(0);
+ return 0;
+ }
+
+ @Override
+ protected Object filterOutboundMessage(Object msg) {
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
+ }
+
+ if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
+ return msg;
+ }
+
+ throw new UnsupportedOperationException(
+ "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
+ }
+
+ @UnstableApi
+ @Override
+ protected final void doShutdownOutput() throws Exception {
+ socket.shutdown(false, true);
+ }
+
+ private void shutdownInput0(final ChannelPromise promise) {
+ try {
+ socket.shutdown(true, false);
+ promise.setSuccess();
+ } catch (Throwable cause) {
+ promise.setFailure(cause);
+ }
+ }
+
+ @Override
+ public boolean isOutputShutdown() {
+ return socket.isOutputShutdown();
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return socket.isInputShutdown();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return socket.isShutdown();
+ }
+
+ @Override
+ public ChannelFuture shutdownOutput() {
+ return shutdownOutput(newPromise());
+ }
+
+ @Override
+ public ChannelFuture shutdownOutput(final ChannelPromise promise) {
+ EventLoop loop = eventLoop();
+ if (loop.inEventLoop()) {
+ ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+ } else {
+ loop.execute(new Runnable() {
+ @Override
+ public void run() {
+ ((AbstractChannel.AbstractUnsafe) unsafe()).shutdownOutput(promise);
+ }
+ });
+ }
+
+ return promise;
+ }
+
+ @Override
+ public ChannelFuture shutdownInput() {
+ return shutdownInput(newPromise());
+ }
+
+ @Override
+ public ChannelFuture shutdownInput(final ChannelPromise promise) {
+ Executor closeExecutor = ((RawSocketStreamUnsafe) unsafe()).prepareToClose();
+ if (closeExecutor != null) {
+ closeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ shutdownInput0(promise);
+ }
+ });
+ } else {
+ EventLoop loop = eventLoop();
+ if (loop.inEventLoop()) {
+ shutdownInput0(promise);
+ } else {
+ loop.execute(new Runnable() {
+ @Override
+ public void run() {
+ shutdownInput0(promise);
+ }
+ });
+ }
+ }
+ return promise;
+ }
+
+ @Override
+ public ChannelFuture shutdown() {
+ return shutdown(newPromise());
+ }
+
+ @Override
+ public ChannelFuture shutdown(final ChannelPromise promise) {
+ ChannelFuture shutdownOutputFuture = shutdownOutput();
+ if (shutdownOutputFuture.isDone()) {
+ shutdownOutputDone(shutdownOutputFuture, promise);
+ } else {
+ shutdownOutputFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
+ shutdownOutputDone(shutdownOutputFuture, promise);
+ }
+ });
+ }
+ return promise;
+ }
+
+ private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
+ ChannelFuture shutdownInputFuture = shutdownInput();
+ if (shutdownInputFuture.isDone()) {
+ shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+ } else {
+ shutdownInputFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
+ shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
+ }
+ });
+ }
+ }
+
+ private static void shutdownDone(ChannelFuture shutdownOutputFuture,
+ ChannelFuture shutdownInputFuture,
+ ChannelPromise promise) {
+ Throwable shutdownOutputCause = shutdownOutputFuture.cause();
+ Throwable shutdownInputCause = shutdownInputFuture.cause();
+ if (shutdownOutputCause != null) {
+ if (shutdownInputCause != null) {
+ logger.debug("Exception suppressed because a previous exception occurred.",
+ shutdownInputCause);
+ }
+ promise.setFailure(shutdownOutputCause);
+ } else if (shutdownInputCause != null) {
+ promise.setFailure(shutdownInputCause);
+ } else {
+ promise.setSuccess();
+ }
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ try {
+ // Calling super.doClose() first so spliceTo(...) will fail on next call.
+ super.doClose();
+ } finally {
+ safeClosePipe(pipeIn);
+ safeClosePipe(pipeOut);
+ clearSpliceQueue();
+ }
+ }
+
+ private void clearSpliceQueue() {
+ if (spliceQueue == null) {
+ return;
+ }
+ for (;;) {
+ SpliceInTask task = spliceQueue.poll();
+ if (task == null) {
+ break;
+ }
+ task.promise.tryFailure(CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION);
+ }
+ }
+
+ private static void safeClosePipe(FileDescriptor fd) {
+ if (fd != null) {
+ try {
+ fd.close();
+ } catch (IOException e) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("Error while closing a pipe", e);
+ }
+ }
+ }
+ }
+
+ class RawSocketStreamUnsafe extends AbstractRawSocketUnsafe {
+ // Overridden here just to be able to access this method from AbstractRawSocketStreamChannel
+ @Override
+ protected Executor prepareToClose() {
+ return super.prepareToClose();
+ }
+
+ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
+ RawSocketRecvByteAllocatorHandle allocHandle) {
+ if (byteBuf != null) {
+ if (byteBuf.isReadable()) {
+ readPending = false;
+ pipeline.fireChannelRead(byteBuf);
+ } else {
+ byteBuf.release();
+ }
+ }
+ allocHandle.readComplete();
+ pipeline.fireChannelReadComplete();
+ pipeline.fireExceptionCaught(cause);
+ if (close || cause instanceof IOException) {
+ shutdownInput(false);
+ }
+ }
+
+ @Override
+ RawSocketRecvByteAllocatorHandle newRawSocketHandle(RecvByteBufAllocator.ExtendedHandle handle) {
+ return new RawSocketRecvByteAllocatorStreamingHandle(handle);
+ }
+
+ @Override
+ void RawSocketInReady() {
+ final ChannelConfig config = config();
+ if (shouldBreakRawSocketInReady(config)) {
+ clearRawSocketIn0();
+ return;
+ }
+ final RawSocketRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+ allocHandle.edgeTriggered(isFlagSet(Native.RawSocketET));
+
+ final ChannelPipeline pipeline = pipeline();
+ final ByteBufAllocator allocator = config.getAllocator();
+ allocHandle.reset(config);
+ RawSocketInBefore();
+
+ ByteBuf byteBuf = null;
+ boolean close = false;
+ try {
+ do {
+ if (spliceQueue != null) {
+ SpliceInTask spliceTask = spliceQueue.peek();
+ if (spliceTask != null) {
+ if (spliceTask.spliceIn(allocHandle)) {
+ // We need to check if it is still active as if not we removed all SpliceTasks in
+ // doClose(...)
+ if (isActive()) {
+ spliceQueue.remove();
+ }
+ continue;
+ } else {
+ break;
+ }
+ }
+ }
+
+ // we use a direct buffer here as the native implementations only be able
+ // to handle direct buffers.
+ byteBuf = allocHandle.allocate(allocator);
+ allocHandle.lastBytesRead(doReadBytes(byteBuf));
+ if (allocHandle.lastBytesRead() <= 0) {
+ // nothing was read, release the buffer.
+ byteBuf.release();
+ byteBuf = null;
+ close = allocHandle.lastBytesRead() < 0;
+ if (close) {
+ // There is nothing left to read as we received an EOF.
+ readPending = false;
+ }
+ break;
+ }
+ allocHandle.incMessagesRead(1);
+ readPending = false;
+ pipeline.fireChannelRead(byteBuf);
+ byteBuf = null;
+
+ if (shouldBreakRawSocketInReady(config)) {
+ // We need to do this for two reasons:
+ //
+ // - If the input was shutdown in between (which may be the case when the user did it in the
+ // fireChannelRead(...) method we should not try to read again to not produce any
+ // miss-leading exceptions.
+ //
+ // - If the user closes the channel we need to ensure we not try to read from it again as
+ // the filedescriptor may be re-used already by the OS if the system is handling a lot of
+ // concurrent connections and so needs a lot of filedescriptors. If not do this we risk
+ // reading data from a filedescriptor that belongs to another socket then the socket that
+ // was "wrapped" by this Channel implementation.
+ break;
+ }
+ } while (allocHandle.continueReading());
+
+ allocHandle.readComplete();
+ pipeline.fireChannelReadComplete();
+
+ if (close) {
+ shutdownInput(false);
+ }
+ } catch (Throwable t) {
+ handleReadException(pipeline, byteBuf, t, close, allocHandle);
+ } finally {
+ RawSocketInFinally(config);
+ }
+ }
+ }
+
+ private void addToSpliceQueue(final SpliceInTask task) {
+ EventLoop eventLoop = eventLoop();
+ if (eventLoop.inEventLoop()) {
+ addToSpliceQueue0(task);
+ } else {
+ eventLoop.execute(new Runnable() {
+ @Override
+ public void run() {
+ addToSpliceQueue0(task);
+ }
+ });
+ }
+ }
+
+ private void addToSpliceQueue0(SpliceInTask task) {
+ if (spliceQueue == null) {
+ spliceQueue = PlatformDependent.newMpscQueue();
+ }
+ spliceQueue.add(task);
+ }
+
+ protected abstract class SpliceInTask {
+ final ChannelPromise promise;
+ int len;
+
+ protected SpliceInTask(int len, ChannelPromise promise) {
+ this.promise = promise;
+ this.len = len;
+ }
+
+ abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
+
+ protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
+ // calculate the maximum amount of data we are allowed to splice
+ int length = Math.min(handle.guess(), len);
+ int splicedIn = 0;
+ for (;;) {
+ // Splicing until there is nothing left to splice.
+ int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
+ if (localSplicedIn == 0) {
+ break;
+ }
+ splicedIn += localSplicedIn;
+ length -= localSplicedIn;
+ }
+
+ return splicedIn;
+ }
+ }
+
+ // Let it directly implement channelFutureListener as well to reduce object creation.
+ private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
+ private final AbstractRawSocketStreamChannel ch;
+
+ SpliceInChannelTask(AbstractRawSocketStreamChannel ch, int len, ChannelPromise promise) {
+ super(len, promise);
+ this.ch = ch;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ promise.setFailure(future.cause());
+ }
+ }
+
+ @Override
+ public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+ assert ch.eventLoop().inEventLoop();
+ if (len == 0) {
+ promise.setSuccess();
+ return true;
+ }
+ try {
+ // We create the pipe on the target channel as this will allow us to just handle pending writes
+ // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
+ // on multiple Channels pointing to one target Channel.
+ FileDescriptor pipeOut = ch.pipeOut;
+ if (pipeOut == null) {
+ // Create a new pipe as non was created before.
+ FileDescriptor[] pipe = pipe();
+ ch.pipeIn = pipe[0];
+ pipeOut = ch.pipeOut = pipe[1];
+ }
+
+ int splicedIn = spliceIn(pipeOut, handle);
+ if (splicedIn > 0) {
+ // Integer.MAX_VALUE is a special value which will result in splice forever.
+ if (len != Integer.MAX_VALUE) {
+ len -= splicedIn;
+ }
+
+ // Depending on if we are done with splicing inbound data we set the right promise for the
+ // outbound splicing.
+ final ChannelPromise splicePromise;
+ if (len == 0) {
+ splicePromise = promise;
+ } else {
+ splicePromise = ch.newPromise().addListener(this);
+ }
+
+ boolean autoRead = config().isAutoRead();
+
+ // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
+ // case.
+ ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
+ ch.unsafe().flush();
+ if (autoRead && !splicePromise.isDone()) {
+ // Write was not done which means the target channel was not writable. In this case we need to
+ // disable reading until we are done with splicing to the target channel because:
+ //
+ // - The user may want to to trigger another splice operation once the splicing was complete.
+ config().setAutoRead(false);
+ }
+ }
+
+ return len == 0;
+ } catch (Throwable cause) {
+ promise.setFailure(cause);
+ return true;
+ }
+ }
+ }
+
+ private final class SpliceOutTask {
+ private final AbstractRawSocketStreamChannel ch;
+ private final boolean autoRead;
+ private int len;
+
+ SpliceOutTask(AbstractRawSocketStreamChannel ch, int len, boolean autoRead) {
+ this.ch = ch;
+ this.len = len;
+ this.autoRead = autoRead;
+ }
+
+ public boolean spliceOut() throws Exception {
+ assert ch.eventLoop().inEventLoop();
+ try {
+ int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
+ len -= splicedOut;
+ if (len == 0) {
+ if (autoRead) {
+ // AutoRead was used and we spliced everything so start reading again
+ config().setAutoRead(true);
+ }
+ return true;
+ }
+ return false;
+ } catch (IOException e) {
+ if (autoRead) {
+ // AutoRead was used and we spliced everything so start reading again
+ config().setAutoRead(true);
+ }
+ throw e;
+ }
+ }
+ }
+
+ private final class SpliceFdTask extends SpliceInTask {
+ private final FileDescriptor fd;
+ private final ChannelPromise promise;
+ private final int offset;
+
+ SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
+ super(len, promise);
+ this.fd = fd;
+ this.promise = promise;
+ this.offset = offset;
+ }
+
+ @Override
+ public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
+ assert eventLoop().inEventLoop();
+ if (len == 0) {
+ promise.setSuccess();
+ return true;
+ }
+
+ try {
+ FileDescriptor[] pipe = pipe();
+ FileDescriptor pipeIn = pipe[0];
+ FileDescriptor pipeOut = pipe[1];
+ try {
+ int splicedIn = spliceIn(pipeOut, handle);
+ if (splicedIn > 0) {
+ // Integer.MAX_VALUE is a special value which will result in splice forever.
+ if (len != Integer.MAX_VALUE) {
+ len -= splicedIn;
+ }
+ do {
+ int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
+ splicedIn -= splicedOut;
+ } while (splicedIn > 0);
+ if (len == 0) {
+ promise.setSuccess();
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ safeClosePipe(pipeIn);
+ safeClosePipe(pipeOut);
+ }
+ } catch (Throwable cause) {
+ promise.setFailure(cause);
+ return true;
+ }
+ }
+ }
+
+ private final class RawSocketSocketWritableByteChannel extends SocketWritableByteChannel {
+ RawSocketSocketWritableByteChannel() {
+ super(socket);
+ }
+
+ @Override
+ protected ByteBufAllocator alloc() {
+ return AbstractRawSocketStreamChannel.this.alloc();
+ }
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
index 8ac21dd..6f4b76a 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
@@ -1,186 +1,118 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
package org.apache.plc4x.java.utils.rawsockets.netty;
-import com.savarese.rocksaw.net.RawSocket;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.nio.AbstractNioByteChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.internal.logging.InternalLogger;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectableChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
-/**
- * Netty channel implementation that uses RockSaw to create a raw socket connection to implement
- * IP-socket based protocols not based on TCP or UDP.
- *
- * NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
- */
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannel extends AbstractRawSocketStreamChannel implements SocketChannel {
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
+ private final RawSocketChannelConfig config;
- // The protocol number is defined in the IP protocol and indicates the type of protocol the payload
- // the IP packet uses. This number is assigned by the IESG. A full list of the registered protocol
- // numbers can be found here: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
- private int protocolNumber;
+ private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
- private RawSocket socket;
- private InetSocketAddress localAddress;
- private InetSocketAddress remoteAddress;
-
- /**
- * Initializes a raw socket that is able to communicate with raw IPv4 and IPv6 sockets, hereby
- * allowing to implement protocols below TCP and UDP.
- *
- * For a list of public known protocol numbers see:
- * https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
- *
- * @param parent
- * @param ch
- * @param protocolNumber protocol number identifying the protocol.
- * @throws PlcIoException
- */
- public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
- super(parent, ch);
-
- this.protocolNumber = protocolNumber;
-
- try {
- socket = new RawSocket();
- socket.setIPHeaderInclude(true);
- } catch (IOException e) {
- throw new PlcIoException("Error setting up raw socket", e);
- }
+ public RawSocketChannel() {
+ super(newSocketStream(), false);
+ config = new RawSocketChannelConfig(this);
}
- /**
- * Opens a connection to the given remote address.
- *
- * @param remoteAddress
- * @param localAddress
- * @return
- * @throws Exception
- */
- @Override
- protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
- if(!(remoteAddress instanceof InetSocketAddress) || !(localAddress instanceof InetSocketAddress)) {
- throw new PlcIoException("Both remoteAddress and localAddress must be of type InetSocketAddress");
- }
-
- try {
- this.localAddress = (InetSocketAddress) localAddress;
- this.remoteAddress = (InetSocketAddress) remoteAddress;
-
- socket.open(RawSocket.PF_INET, protocolNumber);
+ public RawSocketChannel(int fd) {
+ super(fd);
+ config = new RawSocketChannelConfig(this);
+ }
- return socket.isOpen();
- } catch (IllegalStateException | IOException e) {
- return false;
- }
+ RawSocketChannel(LinuxSocket fd, boolean active) {
+ super(fd, active);
+ config = new RawSocketChannelConfig(this);
}
- @Override
- protected void doFinishConnect() throws Exception {
+ RawSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
+ super(parent, fd, remoteAddress);
+ config = new RawSocketChannelConfig(this);
+ if (parent instanceof RawSocketChannel) {
+ tcpMd5SigAddresses = ((RawSocketChannel) parent).tcpMd5SigAddresses();
+ }
}
/**
- * Opens a listening socket.
- *
- * @param localAddress
- * @throws Exception
+ * Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
*/
- @Override
- protected void doBind(SocketAddress localAddress) throws Exception {
- if(socket.isOpen()) {
- throw new PlcIoException("Raw socket already opened.");
- }
- if(localAddress instanceof InetSocketAddress) {
- this.localAddress = (InetSocketAddress) localAddress;
- socket.bind(this.localAddress.getAddress());
- } else {
- throw new PlcIoException("Unsupported type of local address. Only InetSocketAddress supported.");
- }
+ public RawSocketTcpInfo tcpInfo() {
+ return tcpInfo(new RawSocketTcpInfo());
}
/**
- * Closes the connection.
- *
- * @throws Exception
+ * Updates and returns the {@code TCP_INFO} for the current socket.
+ * See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
*/
- @Override
- protected void doDisconnect() throws Exception {
- if(socket.isOpen()) {
- socket.close();
+ public RawSocketTcpInfo tcpInfo(RawSocketTcpInfo info) {
+ try {
+ socket.getTcpInfo(info);
+ return info;
+ } catch (IOException e) {
+ throw new ChannelException(e);
}
}
@Override
- protected ChannelFuture shutdownInput() {
- return null;
- }
-
- @Override
- protected int doReadBytes(ByteBuf buf) throws Exception {
- byte[] byteBuf = new byte[1024];
- int readBytes = socket.read(byteBuf);
- buf.writeBytes(byteBuf, 0, readBytes);
- return readBytes;
+ public InetSocketAddress remoteAddress() {
+ return (InetSocketAddress) super.remoteAddress();
}
@Override
- protected int doWriteBytes(ByteBuf buf) throws Exception {
- byte[] readableBytes = new byte[buf.readableBytes()];
- buf.readBytes(readableBytes);
- socket.write(remoteAddress.getAddress(), readableBytes);
- return readableBytes.length;
+ public InetSocketAddress localAddress() {
+ return (InetSocketAddress) super.localAddress();
}
@Override
- protected long doWriteFileRegion(FileRegion region) throws Exception {
- throw new UnsupportedOperationException("doWriteFileRegion not implemented");
+ public RawSocketChannelConfig config() {
+ return config;
}
@Override
- protected SocketAddress localAddress0() {
- return localAddress;
+ public ServerSocketChannel parent() {
+ return (ServerSocketChannel) super.parent();
}
@Override
- protected SocketAddress remoteAddress0() {
- return remoteAddress;
+ protected AbstractEpollUnsafe newUnsafe() {
+ return new EpollSocketChannelUnsafe();
}
- @Override
- public ChannelConfig config() {
- return null;
+ private final class EpollSocketChannelUnsafe extends RawSocketStreamUnsafe {
+ @Override
+ protected Executor prepareToClose() {
+ try {
+ // Check isOpen() first as otherwise it will throw a RuntimeException
+ // when call getSoLinger() as the fd is not valid anymore.
+ if (isOpen() && config().getSoLinger() > 0) {
+ // We need to cancel this key of the channel so we may not end up in a eventloop spin
+ // because we try to read or write until the actual close happens which may be later due
+ // SO_LINGER handling.
+ // See https://github.com/netty/netty/issues/4449
+ ((RawSocketEventLoop) eventLoop()).remove(RawSocketChannel.this);
+ return GlobalEventExecutor.INSTANCE;
+ }
+ } catch (Throwable ignore) {
+ // Ignore the error as the underlying channel may be closed in the meantime and so
+ // getSoLinger() may produce an exception. In this case we just return null.
+ // See https://github.com/netty/netty/issues/4449
+ }
+ return null;
+ }
}
- @Override
- public boolean isActive() {
- return socket.isOpen();
+ void setTcpMd5Sig(Map<InetAddress, byte[]> keys) throws IOException {
+ tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
}
}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
new file mode 100644
index 0000000..8f77972
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.*;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static io.netty.channel.unix.Limits.SSIZE_MAX;
+
+public class RawSocketChannelConfig extends DefaultChannelConfig {
+ final AbstractRawSocketChannel channel;
+ private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
+
+ RawSocketChannelConfig(AbstractRawSocketChannel channel) {
+ super(channel);
+ this.channel = channel;
+ }
+
+ @Override
+ public Map<ChannelOption<?>, Object> getOptions() {
+ return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getOption(ChannelOption<T> option) {
+ if (option == EpollChannelOption.EPOLL_MODE) {
+ return (T) getEpollMode();
+ }
+ return super.getOption(option);
+ }
+
+ @Override
+ public <T> boolean setOption(ChannelOption<T> option, T value) {
+ validate(option, value);
+ if (option == EpollChannelOption.EPOLL_MODE) {
+ setEpollMode((EpollMode) value);
+ } else {
+ return super.setOption(option, value);
+ }
+ return true;
+ }
+
+ @Override
+ public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
+ super.setConnectTimeoutMillis(connectTimeoutMillis);
+ return this;
+ }
+
+ @Override
+ @Deprecated
+ public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
+ super.setMaxMessagesPerRead(maxMessagesPerRead);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
+ super.setWriteSpinCount(writeSpinCount);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
+ super.setAllocator(allocator);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
+ if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
+ throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
+ RecvByteBufAllocator.ExtendedHandle.class);
+ }
+ super.setRecvByteBufAllocator(allocator);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setAutoRead(boolean autoRead) {
+ super.setAutoRead(autoRead);
+ return this;
+ }
+
+ @Override
+ @Deprecated
+ public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
+ return this;
+ }
+
+ @Override
+ @Deprecated
+ public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
+ super.setWriteBufferWaterMark(writeBufferWaterMark);
+ return this;
+ }
+
+ @Override
+ public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
+ super.setMessageSizeEstimator(estimator);
+ return this;
+ }
+
+ /**
+ * Return the {@link EpollMode} used. Default is
+ * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+ * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+ * {@link EpollMode#LEVEL_TRIGGERED}.
+ */
+ public EpollMode getEpollMode() {
+ return channel.isFlagSet(Native.EPOLLET)
+ ? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
+ }
+
+ /**
+ * Set the {@link EpollMode} used. Default is
+ * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
+ * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
+ * {@link EpollMode#LEVEL_TRIGGERED}.
+ *
+ * <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
+ */
+ public EpollChannelConfig setEpollMode(EpollMode mode) {
+ if (mode == null) {
+ throw new NullPointerException("mode");
+ }
+ try {
+ switch (mode) {
+ case EDGE_TRIGGERED:
+ checkChannelNotRegistered();
+ channel.setFlag(Native.EPOLLET);
+ break;
+ case LEVEL_TRIGGERED:
+ checkChannelNotRegistered();
+ channel.clearFlag(Native.EPOLLET);
+ break;
+ default:
+ throw new Error();
+ }
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ return this;
+ }
+
+ private void checkChannelNotRegistered() {
+ if (channel.isRegistered()) {
+ throw new IllegalStateException("EpollMode can only be changed before channel is registered");
+ }
+ }
+
+ @Override
+ protected final void autoReadCleared() {
+ channel.clearEpollIn();
+ }
+
+ final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
+ this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
+ }
+
+ final long getMaxBytesPerGatheringWrite() {
+ return maxBytesPerGatheringWrite;
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
similarity index 97%
copy from plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
copy to plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
index 8ac21dd..cba1ad8 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannelSav.java
@@ -38,7 +38,7 @@ import java.nio.channels.SelectableChannel;
*
* NOTE: This class is currently a WIP (Work in progress) it should only be used with great care.
*/
-public class RawSocketChannel extends AbstractNioByteChannel {
+public class RawSocketChannelSav extends AbstractNioByteChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
@@ -63,7 +63,7 @@ public class RawSocketChannel extends AbstractNioByteChannel {
* @param protocolNumber protocol number identifying the protocol.
* @throws PlcIoException
*/
- public RawSocketChannel(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
+ public RawSocketChannelSav(Channel parent, SelectableChannel ch, int protocolNumber) throws PlcIoException {
super(parent, ch);
this.protocolNumber = protocolNumber;
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
new file mode 100644
index 0000000..03e024d
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventArray.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead.
+ *
+ * typedef union epoll_data {
+ * void *ptr;
+ * int fd;
+ * uint32_t u32;
+ * uint64_t u64;
+ * } epoll_data_t;
+ *
+ * struct epoll_event {
+ * uint32_t events; // Epoll events
+ * epoll_data_t data; // User data variable
+ * };
+ *
+ * We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an
+ * {@link AbstractRawSocketChannel} and so be able to map it later.
+ */
+final class RawSocketEventArray {
+ // Size of the epoll_event struct
+ private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent();
+ // The offsiet of the data union in the epoll_event struct
+ private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData();
+
+ private long memoryAddress;
+ private int length;
+
+ RawSocketEventArray(int length) {
+ if (length < 1) {
+ throw new IllegalArgumentException("length must be >= 1 but was " + length);
+ }
+ this.length = length;
+ memoryAddress = allocate(length);
+ }
+
+ private static long allocate(int length) {
+ return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE);
+ }
+
+ /**
+ * Return the {@code memoryAddress} which points to the start of this {@link RawSocketEventArray}.
+ */
+ long memoryAddress() {
+ return memoryAddress;
+ }
+
+ /**
+ * Return the length of the {@link RawSocketEventArray} which represent the maximum number of {@code epoll_events}
+ * that can be stored in it.
+ */
+ int length() {
+ return length;
+ }
+
+ /**
+ * Increase the storage of this {@link RawSocketEventArray}.
+ */
+ void increase() {
+ // double the size
+ length <<= 1;
+ free();
+ memoryAddress = allocate(length);
+ }
+
+ /**
+ * Free this {@link RawSocketEventArray}. Any usage after calling this method may segfault the JVM!
+ */
+ void free() {
+ PlatformDependent.freeMemory(memoryAddress);
+ }
+
+ /**
+ * Return the events for the {@code epoll_event} on this index.
+ */
+ int events(int index) {
+ return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE);
+ }
+
+ /**
+ * Return the file descriptor for the {@code epoll_event} on this index.
+ */
+ int fd(int index) {
+ return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET);
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
new file mode 100644
index 0000000..ac13e50
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketEventLoop.java
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.utils.rawsockets.netty;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SelectStrategy;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.util.IntSupplier;
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.IntObjectMap;
+import io.netty.util.concurrent.RejectedExecutionHandler;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static java.lang.Math.min;
+
+/**
+ * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
+ */
+final class RawSocketEventLoop extends SingleThreadEventLoop {
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(RawSocketEventLoop.class);
+ private static final AtomicIntegerFieldUpdater<RawSocketEventLoop> WAKEN_UP_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(RawSocketEventLoop.class, "wakenUp");
+
+ static {
+ // Ensure JNI is initialized by the time this class is loaded by this time!
+ // We use unix-common methods in this class which are backed by JNI methods.
+ Epoll.ensureAvailability();
+ }
+
+ private final FileDescriptor epollFd;
+ private final FileDescriptor eventFd;
+ private final FileDescriptor timerFd;
+ private final IntObjectMap<AbstractRawSocketChannel> channels = new IntObjectHashMap<AbstractRawSocketChannel>(4096);
+ private final boolean allowGrowing;
+ private final RawSocketEventArray events;
+ private final IovArray iovArray = new IovArray();
+ private final SelectStrategy selectStrategy;
+ private final IntSupplier selectNowSupplier = new IntSupplier() {
+ @Override
+ public int get() throws Exception {
+ return epollWaitNow();
+ }
+ };
+ private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return RawSocketEventLoop.super.pendingTasks();
+ }
+ };
+ private volatile int wakenUp;
+ private volatile int ioRatio = 50;
+
+ RawSocketEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
+ SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
+ super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
+ selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
+ if (maxEvents == 0) {
+ allowGrowing = true;
+ events = new RawSocketEventArray(4096);
+ } else {
+ allowGrowing = false;
+ events = new RawSocketEventArray(maxEvents);
+ }
+ boolean success = false;
+ FileDescriptor epollFd = null;
+ FileDescriptor eventFd = null;
+ FileDescriptor timerFd = null;
+ try {
+ this.epollFd = epollFd = Native.newEpollCreate();
+ this.eventFd = eventFd = Native.newEventFd();
+ try {
+ Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
+ }
+ this.timerFd = timerFd = Native.newTimerFd();
+ try {
+ Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ if (epollFd != null) {
+ try {
+ epollFd.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (eventFd != null) {
+ try {
+ eventFd.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (timerFd != null) {
+ try {
+ timerFd.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
+ */
+ IovArray cleanArray() {
+ iovArray.clear();
+ return iovArray;
+ }
+
+ @Override
+ protected void wakeup(boolean inEventLoop) {
+ if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
+ // write to the evfd which will then wake-up epoll_wait(...)
+ Native.eventFdWrite(eventFd.intValue(), 1L);
+ }
+ }
+
+ /**
+ * Register the given epoll with this {@link EventLoop}.
+ */
+ void add(AbstractRawSocketChannel ch) throws IOException {
+ assert inEventLoop();
+ int fd = ch.socket.intValue();
+ Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
+ channels.put(fd, ch);
+ }
+
+ /**
+ * The flags of the given epoll was modified so update the registration
+ */
+ void modify(AbstractRawSocketChannel ch) throws IOException {
+ assert inEventLoop();
+ Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
+ }
+
+ /**
+ * Deregister the given epoll from this {@link EventLoop}.
+ */
+ void remove(AbstractRawSocketChannel ch) throws IOException {
+ assert inEventLoop();
+
+ if (ch.isOpen()) {
+ int fd = ch.socket.intValue();
+ if (channels.remove(fd) != null) {
+ // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
+ // removed once the file-descriptor is closed.
+ Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
+ }
+ }
+ }
+
+ @Override
+ protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
+ // This event loop never calls takeTask()
+ return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
+ : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
+ }
+
+ @Override
+ public int pendingTasks() {
+ // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
+ // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
+ // See https://github.com/netty/netty/issues/5297
+ if (inEventLoop()) {
+ return super.pendingTasks();
+ } else {
+ return submit(pendingTasksCallable).syncUninterruptibly().getNow();
+ }
+ }
+ /**
+ * Returns the percentage of the desired amount of time spent for I/O in the event loop.
+ */
+ public int getIoRatio() {
+ return ioRatio;
+ }
+
+ /**
+ * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
+ * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
+ */
+ public void setIoRatio(int ioRatio) {
+ if (ioRatio <= 0 || ioRatio > 100) {
+ throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
+ }
+ this.ioRatio = ioRatio;
+ }
+
+ private int epollWait(boolean oldWakeup) throws IOException {
+ // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
+ // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
+ // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
+ // in pipeline.
+ if (oldWakeup && hasTasks()) {
+ return epollWaitNow();
+ }
+
+ long totalDelay = delayNanos(System.nanoTime());
+ int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
+ return Native.epollWait(epollFd, events, timerFd, delaySeconds,
+ (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
+ }
+
+ private int epollWaitNow() throws IOException {
+ return Native.epollWait(epollFd, events, timerFd, 0, 0);
+ }
+
+ @Override
+ protected void run() {
+ for (;;) {
+ try {
+ int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
+ switch (strategy) {
+ case SelectStrategy.CONTINUE:
+ continue;
+ case SelectStrategy.SELECT:
+ strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
+
+ // 'wakenUp.compareAndSet(false, true)' is always evaluated
+ // before calling 'selector.wakeup()' to reduce the wake-up
+ // overhead. (Selector.wakeup() is an expensive operation.)
+ //
+ // However, there is a race condition in this approach.
+ // The race condition is triggered when 'wakenUp' is set to
+ // true too early.
+ //
+ // 'wakenUp' is set to true too early if:
+ // 1) Selector is waken up between 'wakenUp.set(false)' and
+ // 'selector.select(...)'. (BAD)
+ // 2) Selector is waken up between 'selector.select(...)' and
+ // 'if (wakenUp.get()) { ... }'. (OK)
+ //
+ // In the first case, 'wakenUp' is set to true and the
+ // following 'selector.select(...)' will wake up immediately.
+ // Until 'wakenUp' is set to false again in the next round,
+ // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
+ // any attempt to wake up the Selector will fail, too, causing
+ // the following 'selector.select(...)' call to block
+ // unnecessarily.
+ //
+ // To fix this problem, we wake up the selector again if wakenUp
+ // is true immediately after selector.select(...).
+ // It is inefficient in that it wakes up the selector for both
+ // the first case (BAD - wake-up required) and the second case
+ // (OK - no wake-up required).
+
+ if (wakenUp == 1) {
+ Native.eventFdWrite(eventFd.intValue(), 1L);
+ }
+ // fallthrough
+ default:
+ }
+
+ final int ioRatio = this.ioRatio;
+ if (ioRatio == 100) {
+ try {
+ if (strategy > 0) {
+ processReady(events, strategy);
+ }
+ } finally {
+ // Ensure we always run tasks.
+ runAllTasks();
+ }
+ } else {
+ final long ioStartTime = System.nanoTime();
+
+ try {
+ if (strategy > 0) {
+ processReady(events, strategy);
+ }
+ } finally {
+ // Ensure we always run tasks.
+ final long ioTime = System.nanoTime() - ioStartTime;
+ runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
+ }
+ }
+ if (allowGrowing && strategy == events.length()) {
+ //increase the size of the array as we needed the whole space for the events
+ events.increase();
+ }
+ } catch (Throwable t) {
+ handleLoopException(t);
+ }
+ // Always handle shutdown even if the loop processing threw an exception.
+ try {
+ if (isShuttingDown()) {
+ closeAll();
+ if (confirmShutdown()) {
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ handleLoopException(t);
+ }
+ }
+ }
+
+ private static void handleLoopException(Throwable t) {
+ logger.warn("Unexpected exception in the selector loop.", t);
+
+ // Prevent possible consecutive immediate failures that lead to
+ // excessive CPU consumption.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+
+ private void closeAll() {
+ try {
+ epollWaitNow();
+ } catch (IOException ignore) {
+ // ignore on close
+ }
+ // Using the intermediate collection to prevent ConcurrentModificationException.
+ // In the `close()` method, the channel is deleted from `channels` map.
+ Collection<AbstractRawSocketChannel> array = new ArrayList<AbstractRawSocketChannel>(channels.size());
+
+ for (AbstractRawSocketChannel channel: channels.values()) {
+ array.add(channel);
+ }
+
+ for (AbstractRawSocketChannel ch: array) {
+ ch.unsafe().close(ch.unsafe().voidPromise());
+ }
+ }
+
+ private void processReady(RawSocketEventArray events, int ready) {
+ for (int i = 0; i < ready; i ++) {
+ final int fd = events.fd(i);
+ if (fd == eventFd.intValue()) {
+ // consume wakeup event.
+ Native.eventFdRead(fd);
+ } else if (fd == timerFd.intValue()) {
+ // consume wakeup event, necessary because the timer is added with ET mode.
+ Native.timerFdRead(fd);
+ } else {
+ final long ev = events.events(i);
+
+ AbstractRawSocketChannel ch = channels.get(fd);
+ if (ch != null) {
+ // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
+ // sure about it!
+ // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
+ // past.
+ AbstractRawSocketUnsafe unsafe = (AbstractRawSocketUnsafe) ch.unsafe();
+
+ // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
+ // to read from the file descriptor.
+ // See https://github.com/netty/netty/issues/3785
+ //
+ // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
+ // In either case epollOutReady() will do the correct thing (finish connecting, or fail
+ // the connection).
+ // See https://github.com/netty/netty/issues/3848
+ if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
+ // Force flush of data as the epoll is writable again
+ unsafe.epollOutReady();
+ }
+
+ // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
+ // See https://github.com/netty/netty/issues/4317.
+ //
+ // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
+ // try to read from the underlying file descriptor and so notify the user about the error.
+ if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
+ // The Channel is still open and there is something to read. Do it now.
+ unsafe.epollInReady();
+ }
+
+ // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
+ // we may close the channel directly or try to read more data depending on the state of the
+ // Channel and als depending on the AbstractEpollChannel subtype.
+ if ((ev & Native.EPOLLRDHUP) != 0) {
+ unsafe.epollRdHupReady();
+ }
+ } else {
+ // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
+ try {
+ Native.epollCtlDel(epollFd.intValue(), fd);
+ } catch (IOException ignore) {
+ // This can happen but is nothing we need to worry about as we only try to delete
+ // the fd from the epoll set as we not found it in our mappings. So this call to
+ // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
+ // deleted before or the file descriptor was closed before.
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ try {
+ try {
+ epollFd.close();
+ } catch (IOException e) {
+ logger.warn("Failed to close the epoll fd.", e);
+ }
+ try {
+ eventFd.close();
+ } catch (IOException e) {
+ logger.warn("Failed to close the event fd.", e);
+ }
+ try {
+ timerFd.close();
+ } catch (IOException e) {
+ logger.warn("Failed to close the timer fd.", e);
+ }
+ } finally {
+ // release native memory
+ iovArray.release();
+ events.free();
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.