You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/17 16:18:53 UTC
[plc4x] branch feature/new-api updated: Raw Socket works now
reasonaly well, I think.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feature/new-api by this push:
new 1ff6376 Raw Socket works now reasonaly well, I think.
1ff6376 is described below
commit 1ff6376fa6359963a7d91e95b1e48beaeb25e737
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 18:18:47 2019 +0200
Raw Socket works now reasonaly well, I think.
---
.../utils/rawsockets/netty2/RawSocketChannel.java | 53 ++++++++++++++++++----
.../rawsockets/netty2/RawSocketChannelTest.java | 6 +--
2 files changed, 47 insertions(+), 12 deletions(-)
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
index dfe5664..3e3c9a6 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -26,9 +26,13 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.OioByteStreamChannel;
import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.NotOpenException;
+import org.pcap4j.core.PacketListener;
import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
import org.pcap4j.core.PcapNetworkInterface;
import org.pcap4j.core.Pcaps;
+import org.pcap4j.packet.Packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +41,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeoutException;
/**
* TODO write comment
@@ -51,7 +57,8 @@ public class RawSocketChannel extends OioByteStreamChannel {
private final RawSocketChannelConfig config;
private PcapNetworkInterface nif;
private PcapHandle handle;
- private ByteBuf buffer;
+ public static ByteBuf buffer;
+ private Thread loopThread;
public RawSocketChannel() {
super(null);
@@ -74,6 +81,23 @@ public class RawSocketChannel extends OioByteStreamChannel {
nif = Pcaps.getDevByName("en0");
handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
buffer = Unpooled.buffer();
+ // Start loop in another Thread
+ loopThread = new Thread(() -> {
+ try {
+ handle.loop(-1, (PacketListener) packet -> {
+ // logger.debug("Captured Packet from PCAP with length {} bytes", packet.getRawData().length);
+ buffer.writeBytes(packet.getRawData());
+ });
+ } catch (PcapNativeException | NotOpenException e) {
+ // TODO this should close everything automatically
+ logger.error("Pcap4j loop thread died!", e);
+ pipeline().fireExceptionCaught(e);
+ } catch (InterruptedException e) {
+ logger.warn("PCAP Loop Thread was interrupted (hopefully intentionally)", e);
+ Thread.currentThread().interrupt();
+ }
+ });
+ loopThread.start();
activate(new PcapInputStream(buffer), new DiscardingOutputStream());
}
@@ -94,11 +118,23 @@ public class RawSocketChannel extends OioByteStreamChannel {
@Override
protected void doDisconnect() throws Exception {
+ this.loopThread.interrupt();
if (this.handle != null) {
this.handle.close();
}
}
+ @Override protected int doReadBytes(ByteBuf buf) throws Exception {
+ if (handle == null || !handle.isOpen()) {
+ return -1;
+ }
+ try {
+ return super.doReadBytes(buf);
+ } catch (SocketTimeoutException ignored) {
+ return 0;
+ }
+ }
+
@Override
public ChannelConfig config() {
return this.config;
@@ -137,17 +173,16 @@ public class RawSocketChannel extends OioByteStreamChannel {
@Override
public int read() throws IOException {
- logger.debug("Reading Byte...");
- while (buf.readableBytes() < 1) {
- // Do nothing
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ // Timeout 10 ms
+ final long timeout = System.nanoTime() + 10_000;
+ while (System.nanoTime() < timeout) {
+ if (buf.readableBytes() > 0) {
+ return buf.readByte();
}
}
- return buf.readByte();
+ throw new SocketTimeoutException();
}
+
}
public class RawSocketUnsafe extends AbstractUnsafe {
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
index ca3a882..54029d0 100644
--- a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -60,8 +60,7 @@ public class RawSocketChannelTest {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("Hello, I received some bytes!");
- // System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+ System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
}
});
ch.pipeline().addLast(new ChannelHandlerAdapter() {
@@ -84,8 +83,9 @@ public class RawSocketChannelTest {
channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
-
+ // Prepare something to read
Thread.sleep(10_000);
+
} finally {
if (channel != null) {
channel.close().sync();