You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/17 15:42:10 UTC

[plc4x] 02/02: Worked on RawSocket Driver

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8fa3834d35c186fbb8874aa52087e36bba5e2ec2
Author: julian <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 17:41:23 2019 +0200

    Worked on RawSocket Driver
---
 .../plc4x/java/utils/rawsockets/RawIpSocket.java   |  27 +++-
 .../utils/rawsockets/netty/RawSocketChannel.java   |  45 +++++-
 .../utils/rawsockets/netty2/RawSocketChannel.java  | 167 +++++++++++++++++++++
 .../rawsockets/netty2/RawSocketChannelConfig.java  |  36 +++++
 .../rawsockets/netty2/RawSocketChannelTest.java    |  96 ++++++++++++
 5 files changed, 358 insertions(+), 13 deletions(-)

diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
index 0215e3b..804fa6a 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
@@ -22,6 +22,7 @@ import org.pcap4j.packet.namednumber.*;
 import org.pcap4j.util.ByteArrays;
 import org.pcap4j.util.LinkLayerAddress;
 import org.pcap4j.util.MacAddress;
+import org.pcap4j.util.NifSelector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,10 +32,7 @@ import java.io.InputStreamReader;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 public class RawIpSocket {
@@ -42,7 +40,7 @@ public class RawIpSocket {
     private static final Logger logger = LoggerFactory.getLogger(RawIpSocket.class);
 
     private static final int SNAPLEN = 65536;
-    private static final int READ_TIMEOUT = 10;
+    private static final int READ_TIMEOUT = 10000;
 
     private static final String GATEWAY_ONLY_NETMASK = "255.255.255.255";
 
@@ -236,8 +234,11 @@ public class RawIpSocket {
                 pool.execute(() -> {
                     try {
                         receiveHandle.loop(-1, listener);
-                    } catch (PcapNativeException | InterruptedException | NotOpenException e) {
+                    } catch (PcapNativeException | NotOpenException e) {
                         logger.error("Error receiving ARP lookup", e);
+                    } catch (InterruptedException e) {
+                        logger.error("Interrupted! Error receiving ARP lookup", e);
+                        Thread.currentThread().interrupt();
                     }
                 });
 
@@ -296,6 +297,14 @@ public class RawIpSocket {
     private FirstHop getFirstHop(InetAddress remoteAddress) throws RawSocketException {
         byte[] remoteIp = remoteAddress.getAddress();
         // Iterate over all network interfaces.
+
+
+//        try {
+//            new NifSelector().selectNetworkInterface().getLinkLayerAddresses();
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+
         try {
             // First try if we can connect to the remote device directly.
             for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -322,8 +331,12 @@ public class RawIpSocket {
                         // If the current address would be able to connect to the remote
                         // address, return this device.
                         if (matches) {
+                            if (dev.getLinkLayerAddresses().isEmpty()) {
+                                continue;
+                            }
+                            LinkLayerAddress localMacAddress = dev.getLinkLayerAddresses().iterator().next();
                             return new FirstHop(dev, localAddress.getAddress(),
-                                dev.getLinkLayerAddresses().iterator().next(),
+                                localMacAddress,
                                 getMacAddress(dev, localAddress.getAddress(), remoteAddress));
                         }
                     }
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
index 1665686..680e8c6 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
@@ -19,19 +19,34 @@ under the License.
 package org.apache.plc4x.java.utils.rawsockets.netty;
 
 import io.netty.channel.*;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.utils.rawsockets.RawIpSocket;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
 
 import java.net.SocketAddress;
 
-public class RawSocketChannel extends AbstractChannel {
+public class RawSocketChannel extends OioByteStreamChannel {
 
     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
 
+    PcapHandle handle;
+
     protected class RawByteUnsafe extends AbstractChannel.AbstractUnsafe {
         @Override
         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
-            //getPipeline()
-            promise.setSuccess();
+            // Connect?!
+            try {
+                doConnect(remoteAddress, localAddress);
+                pipeline().fireChannelActive();
+                promise.setSuccess();
+            } catch (Exception e) {
+                promise.setFailure(e);
+            }
         }
+
     }
 
     public RawSocketChannel() {
@@ -49,6 +64,12 @@ public class RawSocketChannel extends AbstractChannel {
     }
 
     @Override
+    protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        PcapNetworkInterface nif = Pcaps.findAllDevs().get(0);
+        this.handle = nif.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+    }
+
+    @Override
     protected SocketAddress localAddress0() {
         return null;
     }
@@ -65,17 +86,19 @@ public class RawSocketChannel extends AbstractChannel {
 
     @Override
     protected void doDisconnect() throws Exception {
-        System.out.println("disconnect");
+        throw new NotImplementedException("");
     }
 
     @Override
     protected void doClose() throws Exception {
-        System.out.println("close");
+        if (this.handle != null) {
+            this.handle.close();
+        }
     }
 
     @Override
     protected void doBeginRead() throws Exception {
-        System.out.println("beginRead");
+        this.handle.getNextRawPacketEx();
     }
 
     @Override
@@ -103,4 +126,14 @@ public class RawSocketChannel extends AbstractChannel {
         return METADATA;
     }
 
+    @Override
+    protected boolean isInputShutdown() {
+        return false;
+    }
+
+    @Override
+    protected ChannelFuture shutdownInput() {
+        return null;
+    }
+
 }
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
new file mode 100644
index 0000000..dfe5664
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannel extends OioByteStreamChannel {
+
+    private static final Logger logger = LoggerFactory.getLogger(RawSocketChannel.class);
+
+    private final RawSocketChannelConfig config;
+    private PcapNetworkInterface nif;
+    private PcapHandle handle;
+    private ByteBuf buffer;
+
+    public RawSocketChannel() {
+        super(null);
+        config = new RawSocketChannelConfig(this);
+    }
+
+    @Override
+    protected boolean isInputShutdown() {
+        return false;
+    }
+
+    @Override
+    protected ChannelFuture shutdownInput() {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        logger.debug("Connecting...");
+        nif = Pcaps.getDevByName("en0");
+        handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+        buffer = Unpooled.buffer();
+        activate(new PcapInputStream(buffer), new DiscardingOutputStream());
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return new InetSocketAddress(1234);
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return new InetSocketAddress(1234);
+    }
+
+    @Override
+    protected void doBind(SocketAddress localAddress) throws Exception {
+        throw new UnsupportedOperationException("");
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+        if (this.handle != null) {
+            this.handle.close();
+        }
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return this.config;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return true;
+    }
+
+    @Override
+    protected AbstractUnsafe newUnsafe() {
+        return new RawSocketUnsafe();
+    }
+
+    private static class DiscardingOutputStream extends OutputStream {
+        @Override
+        public void write(int b) throws IOException {
+            // discard
+            logger.debug("Discarding {}", b);
+        }
+    }
+
+    private static class PcapInputStream extends InputStream {
+
+        final ByteBuf buf;
+
+        private PcapInputStream(ByteBuf buf) {
+            this.buf = buf;
+        }
+
+        @Override
+        public int available() throws IOException {
+            return buf.readableBytes();
+        }
+
+        @Override
+        public int read() throws IOException {
+            logger.debug("Reading Byte...");
+            while (buf.readableBytes() < 1) {
+                // Do nothing
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            return buf.readByte();
+        }
+    }
+
+    public class RawSocketUnsafe extends AbstractUnsafe {
+
+        @Override
+        public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+            try {
+                doConnect(remoteAddress, localAddress);
+                pipeline().fireChannelActive();
+                promise.setSuccess();
+            } catch (Exception e) {
+                promise.setFailure(e);
+            }
+        }
+
+    }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java
new file mode 100644
index 0000000..18b7022
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.channel.*;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelConfig extends DefaultChannelConfig {
+
+    public RawSocketChannelConfig(Channel channel) {
+        super(channel);
+    }
+
+}
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
new file mode 100644
index 0000000..ca3a882
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(RawSocketChannelTest.class);
+
+    @Test
+    public void doConnect() throws InterruptedException {
+        Channel channel = null;
+        final EventLoopGroup workerGroup = new OioEventLoopGroup();
+        try {
+            Bootstrap bootstrap = new Bootstrap();
+            bootstrap.group(workerGroup);
+            bootstrap.channel(RawSocketChannel.class);
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+            bootstrap.option(ChannelOption.TCP_NODELAY, true);
+            // TODO we should use an explicit (configurable?) timeout here
+            // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+            bootstrap.handler(new ChannelInitializer<RawSocketChannel>() {
+                @Override
+                protected void initChannel(RawSocketChannel ch) throws Exception {
+                    System.out.println("Initialize Buffer!");
+                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
+                        @Override
+                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+                            System.out.println("Hello, I received some bytes!");
+                            // System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+                        }
+                    });
+                    ch.pipeline().addLast(new ChannelHandlerAdapter() {
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                            cause.printStackTrace();
+                        }
+                    });
+                }
+            });
+            // Start the client.
+            final ChannelFuture f = bootstrap.connect("127.0.0.1", 1234);
+            // Wait for sync
+            f.sync();
+            // Wait till the session is finished initializing.
+            channel = f.channel();
+
+            System.out.println("Channel is connected and ready to use...");
+
+
+            channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
+
+
+            Thread.sleep(10_000);
+        } finally {
+            if (channel != null) {
+                channel.close().sync();
+            }
+            workerGroup.shutdownGracefully().sync();
+        }
+    }
+}
\ No newline at end of file