You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/18 13:07:09 UTC

[plc4x] 01/03: Raw Socket works now reasonaly well, I think.

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit a9669a0061f2dbdc70b71cef966838d0a1570ea7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 18:18:47 2019 +0200

    Raw Socket works now reasonaly well, I think.
---
 .../utils/rawsockets/netty2/RawSocketChannel.java  | 202 +++++++++++++++++++++
 .../rawsockets/netty2/RawSocketChannelTest.java    |  96 ++++++++++
 2 files changed, 298 insertions(+)

diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
new file mode 100644
index 0000000..3e3c9a6
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.NotOpenException;
+import org.pcap4j.core.PacketListener;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
+import org.pcap4j.packet.Packet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannel extends OioByteStreamChannel {
+
+    private static final Logger logger = LoggerFactory.getLogger(RawSocketChannel.class);
+
+    private final RawSocketChannelConfig config;
+    private PcapNetworkInterface nif;
+    private PcapHandle handle;
+    public static ByteBuf buffer;
+    private Thread loopThread;
+
+    public RawSocketChannel() {
+        super(null);
+        config = new RawSocketChannelConfig(this);
+    }
+
+    @Override
+    protected boolean isInputShutdown() {
+        return false;
+    }
+
+    @Override
+    protected ChannelFuture shutdownInput() {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        logger.debug("Connecting...");
+        nif = Pcaps.getDevByName("en0");
+        handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+        buffer = Unpooled.buffer();
+        // Start loop in another Thread
+        loopThread = new Thread(() -> {
+            try {
+                handle.loop(-1, (PacketListener) packet -> {
+                    // logger.debug("Captured Packet from PCAP with length {} bytes", packet.getRawData().length);
+                    buffer.writeBytes(packet.getRawData());
+                });
+            } catch (PcapNativeException | NotOpenException e) {
+                // TODO this should close everything automatically
+                logger.error("Pcap4j loop thread died!", e);
+                pipeline().fireExceptionCaught(e);
+            } catch (InterruptedException e) {
+                logger.warn("PCAP Loop Thread was interrupted (hopefully intentionally)", e);
+                Thread.currentThread().interrupt();
+            }
+        });
+        loopThread.start();
+        activate(new PcapInputStream(buffer), new DiscardingOutputStream());
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return new InetSocketAddress(1234);
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return new InetSocketAddress(1234);
+    }
+
+    @Override
+    protected void doBind(SocketAddress localAddress) throws Exception {
+        throw new UnsupportedOperationException("");
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+        this.loopThread.interrupt();
+        if (this.handle != null) {
+            this.handle.close();
+        }
+    }
+
+    @Override protected int doReadBytes(ByteBuf buf) throws Exception {
+        if (handle == null || !handle.isOpen()) {
+            return -1;
+        }
+        try {
+            return super.doReadBytes(buf);
+        } catch (SocketTimeoutException ignored) {
+            return 0;
+        }
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return this.config;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return true;
+    }
+
+    @Override
+    protected AbstractUnsafe newUnsafe() {
+        return new RawSocketUnsafe();
+    }
+
+    private static class DiscardingOutputStream extends OutputStream {
+        @Override
+        public void write(int b) throws IOException {
+            // discard
+            logger.debug("Discarding {}", b);
+        }
+    }
+
+    private static class PcapInputStream extends InputStream {
+
+        final ByteBuf buf;
+
+        private PcapInputStream(ByteBuf buf) {
+            this.buf = buf;
+        }
+
+        @Override
+        public int available() throws IOException {
+            return buf.readableBytes();
+        }
+
+        @Override
+        public int read() throws IOException {
+            // Timeout 10 ms
+            final long timeout = System.nanoTime() + 10_000;
+            while (System.nanoTime() < timeout) {
+                if (buf.readableBytes() > 0) {
+                    return buf.readByte();
+                }
+            }
+            throw new SocketTimeoutException();
+        }
+
+    }
+
+    public class RawSocketUnsafe extends AbstractUnsafe {
+
+        @Override
+        public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+            try {
+                doConnect(remoteAddress, localAddress);
+                pipeline().fireChannelActive();
+                promise.setSuccess();
+            } catch (Exception e) {
+                promise.setFailure(e);
+            }
+        }
+
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
new file mode 100644
index 0000000..54029d0
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(RawSocketChannelTest.class);
+
+    @Test
+    public void doConnect() throws InterruptedException {
+        Channel channel = null;
+        final EventLoopGroup workerGroup = new OioEventLoopGroup();
+        try {
+            Bootstrap bootstrap = new Bootstrap();
+            bootstrap.group(workerGroup);
+            bootstrap.channel(RawSocketChannel.class);
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+            bootstrap.option(ChannelOption.TCP_NODELAY, true);
+            // TODO we should use an explicit (configurable?) timeout here
+            // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+            bootstrap.handler(new ChannelInitializer<RawSocketChannel>() {
+                @Override
+                protected void initChannel(RawSocketChannel ch) throws Exception {
+                    System.out.println("Initialize Buffer!");
+                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
+                        @Override
+                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+                            System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+                        }
+                    });
+                    ch.pipeline().addLast(new ChannelHandlerAdapter() {
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                            cause.printStackTrace();
+                        }
+                    });
+                }
+            });
+            // Start the client.
+            final ChannelFuture f = bootstrap.connect("127.0.0.1", 1234);
+            // Wait for sync
+            f.sync();
+            // Wait till the session is finished initializing.
+            channel = f.channel();
+
+            System.out.println("Channel is connected and ready to use...");
+
+
+            channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
+
+            // Prepare something to read
+            Thread.sleep(10_000);
+
+        } finally {
+            if (channel != null) {
+                channel.close().sync();
+            }
+            workerGroup.shutdownGracefully().sync();
+        }
+    }
+}
\ No newline at end of file