You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/17 15:42:10 UTC
[plc4x] 02/02: Worked on RawSocket Driver
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8fa3834d35c186fbb8874aa52087e36bba5e2ec2
Author: julian <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 17:41:23 2019 +0200
Worked on RawSocket Driver
---
.../plc4x/java/utils/rawsockets/RawIpSocket.java | 27 +++-
.../utils/rawsockets/netty/RawSocketChannel.java | 45 +++++-
.../utils/rawsockets/netty2/RawSocketChannel.java | 167 +++++++++++++++++++++
.../rawsockets/netty2/RawSocketChannelConfig.java | 36 +++++
.../rawsockets/netty2/RawSocketChannelTest.java | 96 ++++++++++++
5 files changed, 358 insertions(+), 13 deletions(-)
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
index 0215e3b..804fa6a 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/RawIpSocket.java
@@ -22,6 +22,7 @@ import org.pcap4j.packet.namednumber.*;
import org.pcap4j.util.ByteArrays;
import org.pcap4j.util.LinkLayerAddress;
import org.pcap4j.util.MacAddress;
+import org.pcap4j.util.NifSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,7 @@ import java.io.InputStreamReader;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.*;
public class RawIpSocket {
@@ -42,7 +40,7 @@ public class RawIpSocket {
private static final Logger logger = LoggerFactory.getLogger(RawIpSocket.class);
private static final int SNAPLEN = 65536;
- private static final int READ_TIMEOUT = 10;
+ private static final int READ_TIMEOUT = 10000;
private static final String GATEWAY_ONLY_NETMASK = "255.255.255.255";
@@ -236,8 +234,11 @@ public class RawIpSocket {
pool.execute(() -> {
try {
receiveHandle.loop(-1, listener);
- } catch (PcapNativeException | InterruptedException | NotOpenException e) {
+ } catch (PcapNativeException | NotOpenException e) {
logger.error("Error receiving ARP lookup", e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted! Error receiving ARP lookup", e);
+ Thread.currentThread().interrupt();
}
});
@@ -296,6 +297,14 @@ public class RawIpSocket {
private FirstHop getFirstHop(InetAddress remoteAddress) throws RawSocketException {
byte[] remoteIp = remoteAddress.getAddress();
// Iterate over all network interfaces.
+
+
+// try {
+// new NifSelector().selectNetworkInterface().getLinkLayerAddresses();
+// } catch (IOException e) {
+// e.printStackTrace();
+// }
+
try {
// First try if we can connect to the remote device directly.
for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -322,8 +331,12 @@ public class RawIpSocket {
// If the current address would be able to connect to the remote
// address, return this device.
if (matches) {
+ if (dev.getLinkLayerAddresses().isEmpty()) {
+ continue;
+ }
+ LinkLayerAddress localMacAddress = dev.getLinkLayerAddresses().iterator().next();
return new FirstHop(dev, localAddress.getAddress(),
- dev.getLinkLayerAddresses().iterator().next(),
+ localMacAddress,
getMacAddress(dev, localAddress.getAddress(), remoteAddress));
}
}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
index 1665686..680e8c6 100644
--- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketChannel.java
@@ -19,19 +19,34 @@ under the License.
package org.apache.plc4x.java.utils.rawsockets.netty;
import io.netty.channel.*;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.utils.rawsockets.RawIpSocket;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
import java.net.SocketAddress;
-public class RawSocketChannel extends AbstractChannel {
+public class RawSocketChannel extends OioByteStreamChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
+ PcapHandle handle;
+
protected class RawByteUnsafe extends AbstractChannel.AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
- //getPipeline()
- promise.setSuccess();
+ // Connect?!
+ try {
+ doConnect(remoteAddress, localAddress);
+ pipeline().fireChannelActive();
+ promise.setSuccess();
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
}
+
}
public RawSocketChannel() {
@@ -49,6 +64,12 @@ public class RawSocketChannel extends AbstractChannel {
}
@Override
+ protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+ PcapNetworkInterface nif = Pcaps.findAllDevs().get(0);
+ this.handle = nif.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+ }
+
+ @Override
protected SocketAddress localAddress0() {
return null;
}
@@ -65,17 +86,19 @@ public class RawSocketChannel extends AbstractChannel {
@Override
protected void doDisconnect() throws Exception {
- System.out.println("disconnect");
+ throw new NotImplementedException("");
}
@Override
protected void doClose() throws Exception {
- System.out.println("close");
+ if (this.handle != null) {
+ this.handle.close();
+ }
}
@Override
protected void doBeginRead() throws Exception {
- System.out.println("beginRead");
+ this.handle.getNextRawPacketEx();
}
@Override
@@ -103,4 +126,14 @@ public class RawSocketChannel extends AbstractChannel {
return METADATA;
}
+ @Override
+ protected boolean isInputShutdown() {
+ return false;
+ }
+
+ @Override
+ protected ChannelFuture shutdownInput() {
+ return null;
+ }
+
}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
new file mode 100644
index 0000000..dfe5664
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannel extends OioByteStreamChannel {
+
+ private static final Logger logger = LoggerFactory.getLogger(RawSocketChannel.class);
+
+ private final RawSocketChannelConfig config;
+ private PcapNetworkInterface nif;
+ private PcapHandle handle;
+ private ByteBuf buffer;
+
+ public RawSocketChannel() {
+ super(null);
+ config = new RawSocketChannelConfig(this);
+ }
+
+ @Override
+ protected boolean isInputShutdown() {
+ return false;
+ }
+
+ @Override
+ protected ChannelFuture shutdownInput() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+ logger.debug("Connecting...");
+ nif = Pcaps.getDevByName("en0");
+ handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+ buffer = Unpooled.buffer();
+ activate(new PcapInputStream(buffer), new DiscardingOutputStream());
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return new InetSocketAddress(1234);
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return new InetSocketAddress(1234);
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+ throw new UnsupportedOperationException("");
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ if (this.handle != null) {
+ this.handle.close();
+ }
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return this.config;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ protected AbstractUnsafe newUnsafe() {
+ return new RawSocketUnsafe();
+ }
+
+ private static class DiscardingOutputStream extends OutputStream {
+ @Override
+ public void write(int b) throws IOException {
+ // discard
+ logger.debug("Discarding {}", b);
+ }
+ }
+
+ private static class PcapInputStream extends InputStream {
+
+ final ByteBuf buf;
+
+ private PcapInputStream(ByteBuf buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return buf.readableBytes();
+ }
+
+ @Override
+ public int read() throws IOException {
+ logger.debug("Reading Byte...");
+ while (buf.readableBytes() < 1) {
+ // Do nothing
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return buf.readByte();
+ }
+ }
+
+ public class RawSocketUnsafe extends AbstractUnsafe {
+
+ @Override
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ try {
+ doConnect(remoteAddress, localAddress);
+ pipeline().fireChannelActive();
+ promise.setSuccess();
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
+ }
+
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java
new file mode 100644
index 0000000..18b7022
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.channel.*;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelConfig extends DefaultChannelConfig {
+
+ public RawSocketChannelConfig(Channel channel) {
+ super(channel);
+ }
+
+}
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
new file mode 100644
index 0000000..ca3a882
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(RawSocketChannelTest.class);
+
+ @Test
+ public void doConnect() throws InterruptedException {
+ Channel channel = null;
+ final EventLoopGroup workerGroup = new OioEventLoopGroup();
+ try {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(RawSocketChannel.class);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ // TODO we should use an explicit (configurable?) timeout here
+ // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+ bootstrap.handler(new ChannelInitializer<RawSocketChannel>() {
+ @Override
+ protected void initChannel(RawSocketChannel ch) throws Exception {
+ System.out.println("Initialize Buffer!");
+ ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ System.out.println("Hello, I received some bytes!");
+ // System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+ }
+ });
+ ch.pipeline().addLast(new ChannelHandlerAdapter() {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ }
+ });
+ }
+ });
+ // Start the client.
+ final ChannelFuture f = bootstrap.connect("127.0.0.1", 1234);
+ // Wait for sync
+ f.sync();
+ // Wait till the session is finished initializing.
+ channel = f.channel();
+
+ System.out.println("Channel is connected and ready to use...");
+
+
+ channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
+
+
+ Thread.sleep(10_000);
+ } finally {
+ if (channel != null) {
+ channel.close().sync();
+ }
+ workerGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file