You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/18 13:07:09 UTC
[plc4x] 01/03: Raw Socket works now reasonaly well, I think.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit a9669a0061f2dbdc70b71cef966838d0a1570ea7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Aug 17 18:18:47 2019 +0200
Raw Socket works now reasonaly well, I think.
---
.../utils/rawsockets/netty2/RawSocketChannel.java | 202 +++++++++++++++++++++
.../rawsockets/netty2/RawSocketChannelTest.java | 96 ++++++++++
2 files changed, 298 insertions(+)
diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
new file mode 100644
index 0000000..3e3c9a6
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannel.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.oio.OioByteStreamChannel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.pcap4j.core.NotOpenException;
+import org.pcap4j.core.PacketListener;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
+import org.pcap4j.packet.Packet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannel extends OioByteStreamChannel {
+
+ private static final Logger logger = LoggerFactory.getLogger(RawSocketChannel.class);
+
+ private final RawSocketChannelConfig config;
+ private PcapNetworkInterface nif;
+ private PcapHandle handle;
+ public static ByteBuf buffer;
+ private Thread loopThread;
+
+ public RawSocketChannel() {
+ super(null);
+ config = new RawSocketChannelConfig(this);
+ }
+
+ @Override
+ protected boolean isInputShutdown() {
+ return false;
+ }
+
+ @Override
+ protected ChannelFuture shutdownInput() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+ logger.debug("Connecting...");
+ nif = Pcaps.getDevByName("en0");
+ handle = nif.openLive(65535, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 10);
+ buffer = Unpooled.buffer();
+ // Start loop in another Thread
+ loopThread = new Thread(() -> {
+ try {
+ handle.loop(-1, (PacketListener) packet -> {
+ // logger.debug("Captured Packet from PCAP with length {} bytes", packet.getRawData().length);
+ buffer.writeBytes(packet.getRawData());
+ });
+ } catch (PcapNativeException | NotOpenException e) {
+ // TODO this should close everything automatically
+ logger.error("Pcap4j loop thread died!", e);
+ pipeline().fireExceptionCaught(e);
+ } catch (InterruptedException e) {
+ logger.warn("PCAP Loop Thread was interrupted (hopefully intentionally)", e);
+ Thread.currentThread().interrupt();
+ }
+ });
+ loopThread.start();
+ activate(new PcapInputStream(buffer), new DiscardingOutputStream());
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return new InetSocketAddress(1234);
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return new InetSocketAddress(1234);
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+ throw new UnsupportedOperationException("");
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ this.loopThread.interrupt();
+ if (this.handle != null) {
+ this.handle.close();
+ }
+ }
+
+ @Override protected int doReadBytes(ByteBuf buf) throws Exception {
+ if (handle == null || !handle.isOpen()) {
+ return -1;
+ }
+ try {
+ return super.doReadBytes(buf);
+ } catch (SocketTimeoutException ignored) {
+ return 0;
+ }
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return this.config;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ protected AbstractUnsafe newUnsafe() {
+ return new RawSocketUnsafe();
+ }
+
+ private static class DiscardingOutputStream extends OutputStream {
+ @Override
+ public void write(int b) throws IOException {
+ // discard
+ logger.debug("Discarding {}", b);
+ }
+ }
+
+ private static class PcapInputStream extends InputStream {
+
+ final ByteBuf buf;
+
+ private PcapInputStream(ByteBuf buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return buf.readableBytes();
+ }
+
+ @Override
+ public int read() throws IOException {
+ // Timeout 10 ms
+ final long timeout = System.nanoTime() + 10_000;
+ while (System.nanoTime() < timeout) {
+ if (buf.readableBytes() > 0) {
+ return buf.readByte();
+ }
+ }
+ throw new SocketTimeoutException();
+ }
+
+ }
+
+ public class RawSocketUnsafe extends AbstractUnsafe {
+
+ @Override
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ try {
+ doConnect(remoteAddress, localAddress);
+ pipeline().fireChannelActive();
+ promise.setSuccess();
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
+ }
+
+ }
+}
diff --git a/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
new file mode 100644
index 0000000..54029d0
--- /dev/null
+++ b/plc4j/utils/raw-sockets/src/test/java/org/apache/plc4x/java/utils/rawsockets/netty2/RawSocketChannelTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.utils.rawsockets.netty2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-16
+ */
+public class RawSocketChannelTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(RawSocketChannelTest.class);
+
+ @Test
+ public void doConnect() throws InterruptedException {
+ Channel channel = null;
+ final EventLoopGroup workerGroup = new OioEventLoopGroup();
+ try {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(RawSocketChannel.class);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ // TODO we should use an explicit (configurable?) timeout here
+ // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+ bootstrap.handler(new ChannelInitializer<RawSocketChannel>() {
+ @Override
+ protected void initChannel(RawSocketChannel ch) throws Exception {
+ System.out.println("Initialize Buffer!");
+ ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ System.out.println(ByteBufUtil.prettyHexDump(((ByteBuf) msg)));
+ }
+ });
+ ch.pipeline().addLast(new ChannelHandlerAdapter() {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ }
+ });
+ }
+ });
+ // Start the client.
+ final ChannelFuture f = bootstrap.connect("127.0.0.1", 1234);
+ // Wait for sync
+ f.sync();
+ // Wait till the session is finished initializing.
+ channel = f.channel();
+
+ System.out.println("Channel is connected and ready to use...");
+
+
+ channel.writeAndFlush(Unpooled.wrappedBuffer("Hallo".getBytes()));
+
+ // Prepare something to read
+ Thread.sleep(10_000);
+
+ } finally {
+ if (channel != null) {
+ channel.close().sync();
+ }
+ workerGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file