You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/17 16:18:53 UTC

[plc4x] branch feature/new-api updated: Raw Socket works now reasonaly well, I think.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/new-api by this push:
     new 1ff6376  Raw Socket works now reasonaly well, I think.
1ff6376 is described below

commit 1ff6376fa6359963a7d91e95b1e48beaeb25e737
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 18:18:47 2019 +0200

    Raw Socket works now reasonaly well, I think.
---
 .../utils/rawsockets/netty2/RawSocketChannel.java  | 53 ++++++++++++++++++----
 .../rawsockets/netty2/RawSocketChannelTest.java    |  6 +--
 2 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
index dfe5664..3e3c9a6 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -26,9 +26,13 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.oio.OioByteStreamChannel;
 import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.NotOpenException;
+import org.pcap4j.core.PacketListener;
 import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
 import org.pcap4j.core.PcapNetworkInterface;
 import org.pcap4j.core.Pcaps;
+import org.pcap4j.packet.Packet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +41,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeoutException;
 
 /**
  * TODO write comment
@@ -51,7 +57,8 @@ public class RawSocketChannel extends OioByteStreamChannel {
     private final RawSocketChannelConfig config;
     private PcapNetworkInterface nif;
     private PcapHandle handle;
-    private ByteBuf buffer;
+    public static ByteBuf buffer;
+    private Thread loopThread;
 
     public RawSocketChannel() {
         super(null);
@@ -74,6 +81,23 @@ public class RawSocketChannel extends OioByteStreamChannel {
         nif = Pcaps.getDevByName("en0");
         handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
         buffer = Unpooled.buffer();
+        // Start loop in another Thread
+        loopThread = new Thread(() -> {
+            try {
+                handle.loop(-1, (PacketListener) packet -> {
+                    // logger.debug("Captured Packet from PCAP with length {} bytes", packet.getRawData().length);
+                    buffer.writeBytes(packet.getRawData());
+                });
+            } catch (PcapNativeException | NotOpenException e) {
+                // TODO this should close everything automatically
+                logger.error("Pcap4j loop thread died!", e);
+                pipeline().fireExceptionCaught(e);
+            } catch (InterruptedException e) {
+                logger.warn("PCAP Loop Thread was interrupted (hopefully intentionally)", e);
+                Thread.currentThread().interrupt();
+            }
+        });
+        loopThread.start();
         activate(new PcapInputStream(buffer), new DiscardingOutputStream());
     }
 
@@ -94,11 +118,23 @@ public class RawSocketChannel extends OioByteStreamChannel {
 
     @Override
     protected void doDisconnect() throws Exception {
+        this.loopThread.interrupt();
         if (this.handle != null) {
             this.handle.close();
         }
     }
 
+    @Override protected int doReadBytes(ByteBuf buf) throws Exception {
+        if (handle == null || !handle.isOpen()) {
+            return -1;
+        }
+        try {
+            return super.doReadBytes(buf);
+        } catch (SocketTimeoutException ignored) {
+            return 0;
+        }
+    }
+
     @Override
     public ChannelConfig config() {
         return this.config;
@@ -137,17 +173,16 @@ public class RawSocketChannel extends OioByteStreamChannel {
 
         @Override
         public int read() throws IOException {
-            logger.debug("Reading Byte...");
-            while (buf.readableBytes() < 1) {
-                // Do nothing
-                try {
-                    Thread.sleep(1);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+            // Timeout 10 ms
+            final long timeout = System.nanoTime() + 10_000;
+            while (System.nanoTime() < timeout) {
+                if (buf.readableBytes() > 0) {
+                    return buf.readByte();
                 }
             }
-            return buf.readByte();
+            throw new SocketTimeoutException();
         }
+
     }
 
     public class RawSocketUnsafe extends AbstractUnsafe {
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
index ca3a882..54029d0 100644
--- a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -60,8 +60,7 @@ public class RawSocketChannelTest {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                         @Override
                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-                            System.out.println("Hello, I received some bytes!");
-                            // System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+                            System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
                         }
                     });
                     ch.pipeline().addLast(new ChannelHandlerAdapter() {
@@ -84,8 +83,9 @@ public class RawSocketChannelTest {
 
             channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
 
-
+            // Prepare something to read
             Thread.sleep(10_000);
+
         } finally {
             if (channel != null) {
                 channel.close().sync();