You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2022/10/06 15:23:56 UTC

[plc4x] 11/13: fix(plc4j/profinet): Cleaned up the message send and receive interface

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch plc4j/profinet
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit f90225bd43321ff7c9f014e6fd73344148b54fbe
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Sun Sep 18 13:30:24 2022 -0600

    fix(plc4j/profinet): Cleaned up the message send and receive interface
---
 .../java/api/messages/PlcDiscoveryItemHandler.java |   2 +
 .../profinet/config/ProfinetConfiguration.java     |  23 ++
 .../java/profinet/device/ProfinetCallable.java     |  10 +
 .../plc4x/java/profinet/device/ProfinetDevice.java | 301 +++++++++++++++++++++
 .../device/ProfinetDeviceMessageHandler.java       |  51 ++++
 .../profinet/device/ProfinetMessageWrapper.java    |  48 ++++
 .../profinet/protocol/ProfinetProtocolLogic.java   | 133 ++++-----
 .../resources/protocols/profinet/profinet.mspec    |   8 +
 8 files changed, 511 insertions(+), 65 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcDiscoveryItemHandler.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcDiscoveryItemHandler.java
index bb2bb0196..9aa33d4ee 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcDiscoveryItemHandler.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcDiscoveryItemHandler.java
@@ -18,6 +18,8 @@
  */
 package org.apache.plc4x.java.api.messages;
 
+import java.util.List;
+
 public interface PlcDiscoveryItemHandler {
 
     void handle(PlcDiscoveryItem discoveryItem);
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/config/ProfinetConfiguration.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/config/ProfinetConfiguration.java
index ee3c0c1c3..e262b7ce6 100644
--- a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/config/ProfinetConfiguration.java
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/config/ProfinetConfiguration.java
@@ -18,10 +18,21 @@
  */
 package org.apache.plc4x.java.profinet.config;
 
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.plc4x.java.profinet.device.ProfinetDevice;
+import org.apache.plc4x.java.profinet.readwrite.MacAddress;
 import org.apache.plc4x.java.spi.configuration.Configuration;
+import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
+import org.apache.plc4x.java.spi.configuration.annotations.defaults.BooleanDefaultValue;
+import org.apache.plc4x.java.spi.configuration.annotations.defaults.StringDefaultValue;
 import org.apache.plc4x.java.transport.rawsocket.RawSocketTransportConfiguration;
 import org.apache.plc4x.java.utils.pcap.netty.handlers.PacketHandler;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 public class ProfinetConfiguration implements Configuration, RawSocketTransportConfiguration {
 
     @Override
@@ -44,6 +55,18 @@ public class ProfinetConfiguration implements Configuration, RawSocketTransportC
         return null;
     }
 
+    @ConfigurationParameter("devices")
+    @StringDefaultValue("")
+    private String devices;
+
+    public HashMap<MacAddress, ProfinetDevice> configuredDevices = new HashMap<>();
+
+    public void setDevices(String sDevices) throws DecoderException {
+        // TODO:- Add support for passing in configured devices.
+        MacAddress macAddress = new MacAddress(Hex.decodeHex("005056c00001"));
+        configuredDevices.put(macAddress, new ProfinetDevice(macAddress));
+    }
+
     @Override
     public String toString() {
         return "Configuration{" +
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetCallable.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetCallable.java
new file mode 100644
index 000000000..fd117c9f4
--- /dev/null
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetCallable.java
@@ -0,0 +1,10 @@
+package org.apache.plc4x.java.profinet.device;
+
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.profinet.readwrite.DceRpc_Packet;
+
+public interface ProfinetCallable {
+    void handle(DceRpc_Packet packet) throws PlcException;
+
+    DceRpc_Packet create() throws PlcException;
+}
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDevice.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDevice.java
new file mode 100644
index 000000000..80f10d961
--- /dev/null
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDevice.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.profinet.device;
+
+import io.netty.channel.Channel;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.messages.PlcDiscoveryItem;
+import org.apache.plc4x.java.profinet.protocol.ProfinetProtocolLogic;
+import org.apache.plc4x.java.profinet.readwrite.*;
+import org.apache.plc4x.java.spi.ConversationContext;
+import org.apache.plc4x.java.spi.generation.*;
+import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ProfinetDevice {
+
+    private static final int DEFAULT_UDP_PORT = 34964;
+    private final Logger logger = LoggerFactory.getLogger(ProfinetDevice.class);
+    private final DceRpc_ActivityUuid uuid;
+
+    private DatagramSocket udpSocket;
+    private RawSocketChannel rawSocketChannel;
+    private Channel channel;
+    private final MacAddress macAddress;
+    private ConversationContext<Ethernet_Frame> context;
+    private ProfinetDeviceState state = ProfinetDeviceState.IDLE;
+    private Lldp_Pdu lldpPdu = null;
+    private PnDcp_Pdu dcpPdu = null;
+    private String ipAddress;
+    private String portId;
+
+    private AtomicInteger sessionKeyGenerator = new AtomicInteger(1);
+
+
+    private void closeUDPSocket() {
+        // Handle the closing of the connection, might need to send some messages beforehand.
+        if (udpSocket != null && !udpSocket.isConnected()) {
+            udpSocket.close();
+            context.getChannel().close();
+        }
+    }
+
+    private boolean createUDPSocket() {
+        if (state != ProfinetDeviceState.IDLE) {
+            closeUDPSocket();
+        }
+        if (!(channel instanceof RawSocketChannel)) {
+            logger.warn("Expected a 'raw' transport, closing channel...");
+            closeUDPSocket();
+            return false;
+        }
+
+        rawSocketChannel = (RawSocketChannel) channel;
+
+        // Create an udp socket
+        try {
+            udpSocket = new DatagramSocket();
+        } catch (SocketException e) {
+            logger.warn("Unable to create udp socket " + e.getMessage());
+            closeUDPSocket();
+            return false;
+        }
+        return true;
+    }
+
+    public boolean onConnect() {
+        if (!createUDPSocket()) {
+            // Unable to create UDP connection
+            return false;
+        }
+
+        ProfinetMessageWrapper.sendMessage(
+            new CreateConnection(),
+            this
+        );
+
+        return false;
+    }
+
+    private int generateSessionKey() {
+        // Generate a new session key.
+        Integer sessionKey = sessionKeyGenerator.getAndIncrement();
+        // Reset the session key as soon as it reaches the max for a 16 bit uint
+        if (sessionKeyGenerator.get() == 0xFFFF) {
+            sessionKeyGenerator.set(1);
+        }
+        return sessionKey;
+    }
+
+    public boolean hasLldpPdu() {
+        if (lldpPdu != null) {
+            return true;
+        }
+        return false;
+    }
+
+    public boolean hasDcpPdu() {
+        if (dcpPdu != null) {
+            return true;
+        }
+        return false;
+    }
+
+    public void handle(PlcDiscoveryItem item) {
+        logger.debug("Received Discovered item at device");
+        if (item.getOptions().containsKey("IpAddress")) {
+            this.ipAddress = item.getOptions().get("IpAddress");
+        }
+        if (item.getOptions().containsKey("PortId")) {
+            this.portId = item.getOptions().get("PortId");
+        }
+    }
+
+    public void setContext(ConversationContext<Ethernet_Frame> context) {
+        this.context = context;
+        channel = context.getChannel();
+    }
+
+    public ProfinetDevice(MacAddress macAddress) {
+        this.macAddress = macAddress;
+        // Generate a new Activity Id, which will be used throughout the connection.
+        this.uuid = generateActivityUuid();
+    }
+
+    protected static DceRpc_ActivityUuid generateActivityUuid() {
+        UUID number = UUID.randomUUID();
+        try {
+            WriteBufferByteBased wb = new WriteBufferByteBased(128);
+            wb.writeLong(64, number.getMostSignificantBits());
+            wb.writeLong(64, number.getLeastSignificantBits());
+
+            ReadBuffer rb = new ReadBufferByteBased(wb.getData());
+            return new DceRpc_ActivityUuid(rb.readLong(32), rb.readInt(16), rb.readInt(16), rb.readByteArray(8));
+        } catch (SerializationException | ParseException e) {
+            // Ignore ... this should actually never happen.
+        }
+        return null;
+    }
+
+    public DatagramSocket getUdpSocket() {
+        return this.udpSocket;
+    }
+
+    public InetAddress getIpAddress() throws UnknownHostException {
+        return InetAddress.getByName(this.ipAddress);
+    }
+
+    public int getPort() {
+        return DEFAULT_UDP_PORT;
+    }
+
+    public class CreateConnection implements ProfinetCallable {
+
+        public DceRpc_Packet create() throws PlcException {
+            try {
+                return new DceRpc_Packet(
+                    DceRpc_PacketType.REQUEST, true, false, false,
+                    IntegerEncoding.BIG_ENDIAN, CharacterEncoding.ASCII, FloatingPointEncoding.IEEE,
+                    new DceRpc_ObjectUuid((byte) 0x00, 0x0001, 0x0904, 0x002A),
+                    new DceRpc_InterfaceUuid_DeviceInterface(),
+                    ProfinetDevice.this.uuid,
+                    0, 0, DceRpc_Operation.CONNECT,
+                    new PnIoCm_Packet_Req(16696, 16696, 0, 0,
+                        Arrays.asList(
+                            new PnIoCm_Block_ArReq((short) 1, (short) 0, PnIoCm_ArType.IO_CONTROLLER,
+                                new Uuid(Hex.decodeHex("654519352df3b6428f874371217c2b51")),
+                                ProfinetDevice.this.generateSessionKey(),
+                                ProfinetDevice.this.macAddress,
+                                new Uuid(Hex.decodeHex("dea000006c9711d1827100640008002a")),
+                                false, true, false,
+                                false, PnIoCm_CompanionArType.SINGLE_AR, false,
+                                true, false, PnIoCm_State.ACTIVE,
+                                600,
+                                // This actually needs to be set to this value and not the real port number.
+                                0x8892,
+                                // It seems that it must be set to this value, or it won't work.
+                                "plc4x"),
+                            new PnIoCm_Block_IoCrReq((short) 1, (short) 0, PnIoCm_IoCrType.INPUT_CR,
+                                0x0001,
+                                0x8892,
+                                false, false,
+                                false, false, PnIoCm_RtClass.RT_CLASS_2, 40,
+                                0xBBF0, 128, 8, 1, 0, 0xffffffff,
+                                50, 50, 0xC000,
+                                new org.apache.plc4x.java.profinet.readwrite.MacAddress(Hex.decodeHex("000000000000")),
+                                Collections.singletonList(
+                                    new PnIoCm_IoCrBlockReqApi(
+                                        Arrays.asList(
+                                            new PnIoCm_IoDataObject(0, 0x0001, 0),
+                                            new PnIoCm_IoDataObject(0, 0x8000, 1),
+                                            new PnIoCm_IoDataObject(0, 0x8001, 2)
+                                        ),
+                                        new ArrayList<PnIoCm_IoCs>(0))
+                                )),
+                            new PnIoCm_Block_IoCrReq((short) 1, (short) 0, PnIoCm_IoCrType.OUTPUT_CR,
+                                0x0002, 0x8892, false, false,
+                                false, false, PnIoCm_RtClass.RT_CLASS_2, 40,
+                                0xFFFF, 128, 8, 1, 0, 0xffffffff,
+                                50, 50, 0xC000,
+                                new MacAddress(Hex.decodeHex("000000000000")),
+                                Collections.singletonList(
+                                    new PnIoCm_IoCrBlockReqApi(
+                                        new ArrayList<PnIoCm_IoDataObject>(0),
+                                        Arrays.asList(
+                                            new PnIoCm_IoCs(0, 0x0001, 0),
+                                            new PnIoCm_IoCs(0, 0x8000, 1),
+                                            new PnIoCm_IoCs(0, 0x8001, 2)
+                                        )
+                                    )
+                                )
+                            ),
+                            new PnIoCm_Block_ExpectedSubmoduleReq((short) 1, (short) 0,
+                                Collections.singletonList(
+                                    new PnIoCm_ExpectedSubmoduleBlockReqApi(0,
+                                        0x00000001, 0x00000000,
+                                        Arrays.asList(
+                                            new PnIoCm_Submodule_NoInputNoOutputData(0x0001,
+                                                0x00000001, false, false,
+                                                false, false),
+                                            new PnIoCm_Submodule_NoInputNoOutputData(0x8000,
+                                                0x00008000, false, false,
+                                                false, false),
+                                            new PnIoCm_Submodule_NoInputNoOutputData(0x8001,
+                                                0x00008001, false, false,
+                                                false, false)
+                                        )
+                                    )
+                                )
+                            ),
+                            new PnIoCm_Block_AlarmCrReq((short) 1, (short) 0,
+                                PnIoCm_AlarmCrType.ALARM_CR, 0x8892, false, false, 1, 3,
+                                0x0000, 200, 0xC000, 0xA000)
+                        ))
+                );
+
+            /*// Build the UDP/IP/EthernetFrame to transport the package.
+            return new Ethernet_Frame(profinetDriverContext.getRemoteMacAddress(), profinetDriverContext.getLocalMacAddress(),
+                new Ethernet_FramePayload_IPv4(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE), (short) 64,
+                    profinetDriverContext.getLocalIpAddress(), profinetDriverContext.getRemoteIpAddress(),
+                    new Udp_Packet(profinetDriverContext.getLocalUdpPort(), profinetDriverContext.getRemoteUdpPort(),
+                        dceRpcConnectionRequest)));*/
+            } catch (DecoderException e) {
+                throw new PlcException("Error creating connection request", e);
+            }
+        }
+
+        public void handle(DceRpc_Packet dceRpc_packet) throws PlcException {
+            if ((dceRpc_packet.getOperation() == DceRpc_Operation.CONNECT) && (dceRpc_packet.getPacketType() == DceRpc_PacketType.RESPONSE)) {
+                if (dceRpc_packet.getPayload().getPacketType() == DceRpc_PacketType.RESPONSE) {
+
+                    // Get the remote MAC address and store it in the context.
+                    final PnIoCm_Packet_Res connectResponse = (PnIoCm_Packet_Res) dceRpc_packet.getPayload();
+                    if ((connectResponse.getBlocks().size() > 0) && (connectResponse.getBlocks().get(0) instanceof PnIoCm_Block_ArRes)) {
+                        final PnIoCm_Block_ArRes pnIoCm_block_arRes = (PnIoCm_Block_ArRes) connectResponse.getBlocks().get(0);
+
+                        // Update the raw-socket transports filter expression.
+                        ((RawSocketChannel) channel).setRemoteMacAddress(org.pcap4j.util.MacAddress.getByAddress(macAddress.getAddress()));
+                    } else {
+                        throw new PlcException("Unexpected type of first block.");
+                    }
+                } else {
+                    throw new PlcException("Unexpected response");
+                }
+            } else if (dceRpc_packet.getPacketType() == DceRpc_PacketType.REJECT) {
+                throw new PlcException("Device rejected connection request");
+            } else {
+                throw new PlcException("Unexpected response");
+            }
+        }
+    }
+}
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDeviceMessageHandler.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDeviceMessageHandler.java
new file mode 100644
index 000000000..b737801fa
--- /dev/null
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetDeviceMessageHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.profinet.device;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.plc4x.java.api.messages.PlcDiscoveryItem;
+import org.apache.plc4x.java.api.messages.PlcDiscoveryItemHandler;
+import org.apache.plc4x.java.profinet.readwrite.MacAddress;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ProfinetDeviceMessageHandler implements PlcDiscoveryItemHandler {
+
+    private HashMap<MacAddress, ProfinetDevice> configuredDevices;
+
+    @Override
+    public void handle(PlcDiscoveryItem discoveryItem) {
+        try {
+            MacAddress macAddress = new MacAddress(Hex.decodeHex(discoveryItem.getOptions().get("MacAddress")));
+            if (configuredDevices.containsKey(macAddress)) {
+                configuredDevices.get(macAddress).handle(discoveryItem);
+            }
+        } catch (DecoderException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setConfiguredDevices(HashMap<MacAddress, ProfinetDevice> configuredDevices) {
+        this.configuredDevices = configuredDevices;
+    }
+}
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetMessageWrapper.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetMessageWrapper.java
new file mode 100644
index 000000000..bee0ae8d3
--- /dev/null
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/device/ProfinetMessageWrapper.java
@@ -0,0 +1,48 @@
+package org.apache.plc4x.java.profinet.device;
+
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.profinet.readwrite.*;
+import org.apache.plc4x.java.spi.generation.ParseException;
+import org.apache.plc4x.java.spi.generation.ReadBufferByteBased;
+import org.apache.plc4x.java.spi.generation.SerializationException;
+import org.apache.plc4x.java.spi.generation.WriteBufferByteBased;
+import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannel;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+
+public class ProfinetMessageWrapper {
+
+    public static void sendMessage(ProfinetCallable callable, ProfinetDevice context) throws RuntimeException {
+        try {
+            DceRpc_Packet packet = callable.create();
+            // Serialize it to a byte-payload
+            WriteBufferByteBased writeBuffer = new WriteBufferByteBased(packet.getLengthInBytes());
+            packet.serialize(writeBuffer);
+            // Create a udp packet.
+            DatagramPacket connectRequestPacket = new DatagramPacket(writeBuffer.getData(), writeBuffer.getData().length);
+            connectRequestPacket.setAddress(context.getIpAddress());
+            connectRequestPacket.setPort(context.getPort());
+
+            // Send it.
+            context.getUdpSocket().send(connectRequestPacket);
+
+            // Receive the response.
+            byte[] resultBuffer = new byte[packet.getLengthInBytes()];
+            DatagramPacket connectResponsePacket = new DatagramPacket(resultBuffer, resultBuffer.length);
+            context.getUdpSocket().receive(connectResponsePacket);
+            ReadBufferByteBased readBuffer = new ReadBufferByteBased(resultBuffer);
+            final DceRpc_Packet dceRpc_packet = DceRpc_Packet.staticParse(readBuffer);
+            callable.handle(dceRpc_packet);
+        } catch (SerializationException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
+        } catch (PlcException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+}
diff --git a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/protocol/ProfinetProtocolLogic.java b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/protocol/ProfinetProtocolLogic.java
index efbac95c4..fe931765c 100644
--- a/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/protocol/ProfinetProtocolLogic.java
+++ b/plc4j/drivers/profinet/src/main/java/org/apache/plc4x/java/profinet/protocol/ProfinetProtocolLogic.java
@@ -24,13 +24,20 @@ import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.profinet.config.ProfinetConfiguration;
 import org.apache.plc4x.java.profinet.context.ProfinetDriverContext;
+import org.apache.plc4x.java.profinet.device.ProfinetDevice;
+import org.apache.plc4x.java.profinet.device.ProfinetDeviceMessageHandler;
 import org.apache.plc4x.java.profinet.discovery.ProfinetPlcDiscoverer;
 import org.apache.plc4x.java.profinet.readwrite.*;
 import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.Plc4xProtocolBase;
+import org.apache.plc4x.java.spi.configuration.HasConfiguration;
 import org.apache.plc4x.java.spi.generation.*;
 import org.apache.plc4x.java.spi.messages.DefaultPlcDiscoveryRequest;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
 import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannel;
 import org.pcap4j.core.PcapAddress;
 import org.pcap4j.core.PcapNativeException;
@@ -45,13 +52,11 @@ import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
+public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> implements HasConfiguration<ProfinetConfiguration>, PlcSubscriber {
 
     public static final Duration REQUEST_TIMEOUT = Duration.ofMillis(10000);
-
-    private static AtomicInteger sessionKeyGenerator = new AtomicInteger(1);
-
     private final Logger logger = LoggerFactory.getLogger(ProfinetProtocolLogic.class);
 
     private ProfinetDriverContext profinetDriverContext;
@@ -61,6 +66,10 @@ public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
     private RawSocketChannel rawSocketChannel;
     private Channel channel;
 
+    private ProfinetDeviceMessageHandler handler = new ProfinetDeviceMessageHandler();
+
+    private ProfinetConfiguration configuration;
+
     private static final Uuid ARUUID;
 
     static {
@@ -72,53 +81,66 @@ public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
     }
 
     @Override
-    public void setContext(ConversationContext<Ethernet_Frame> context) {
-        super.setContext(context);
-        this.profinetDriverContext = (ProfinetDriverContext) driverContext;
+    public void setConfiguration(ProfinetConfiguration configuration) {
+        this.configuration = configuration;
+        this.handler.setConfiguredDevices(configuration.configuredDevices);
     }
 
     @Override
-    public void onConnect(ConversationContext<Ethernet_Frame> context) {
-        channel = context.getChannel();
-        connected = false;
-        if (!(channel instanceof RawSocketChannel)) {
-            logger.warn("Expected a 'raw' transport, closing channel...");
-            context.getChannel().close();
-            return;
+    public void setContext(ConversationContext<Ethernet_Frame> context) {
+        super.setContext(context);
+        this.profinetDriverContext = (ProfinetDriverContext) driverContext;
+        for (Map.Entry<MacAddress, ProfinetDevice> device : configuration.configuredDevices.entrySet()) {
+            device.getValue().setContext(context);
         }
-
-        rawSocketChannel = (RawSocketChannel) channel;
-
-        // Create an udp socket
         try {
-            udpSocket = new DatagramSocket();
-        } catch (SocketException e) {
-            logger.warn("Unable to create udp socket " + e.getMessage());
-            context.getChannel().close();
-            return;
+            onDeviceDiscovery();
+        } catch (InterruptedException e) {
         }
+    }
 
+    private void onDeviceDiscovery() throws InterruptedException {
         ProfinetPlcDiscoverer discoverer = new ProfinetPlcDiscoverer();
         DefaultPlcDiscoveryRequest request = new DefaultPlcDiscoveryRequest(
             discoverer,
             new LinkedHashMap<>()
         );
 
-        discoverer.ongoingDiscoverWithHandler(
+        // TODO:- Add handler for un-requested messages
+        discoverer.discoverWithHandler(
             request,
-            null,
-            5000L,
-            30000L
+            handler
         );
+        waitForDeviceDiscovery();
+    }
 
-        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-        // Initialize some important datastructures, that will be used a lot.
+    private void waitForDeviceDiscovery() throws InterruptedException {
+        // Once we receive an LLDP and PN-DCP message for each device move on.
+        boolean discovered = false;
+        int count = 0;
+        while (!discovered) {
+            discovered = true;
+            for (Map.Entry<MacAddress, ProfinetDevice> device : configuration.configuredDevices.entrySet()) {
+                if (!device.getValue().hasLldpPdu() || !device.getValue().hasDcpPdu()) {
+                    discovered = false;
+                }
+            }
+            if (!discovered) {
+                Thread.sleep(3000L);
+                count += 1;
+            }
+            if (count > 5) {
+                break;
+            }
+        }
+    }
 
-        // Generate a new Activity Id, which will be used throughout the connection.
-        profinetDriverContext.setDceRpcActivityUuid(generateActivityUuid());
+    @Override
+    public void onConnect(ConversationContext<Ethernet_Frame> context) {
 
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Initialize some important datastructures, that will be used a lot.
         // TODO: Possibly we can remove the ARP lookup and simply use the mac address in the connection-response.
-
         // Local connectivity attributes
         profinetDriverContext.setLocalMacAddress(new MacAddress(rawSocketChannel.getLocalMacAddress().getAddress()));
         final InetSocketAddress localAddress = (InetSocketAddress) rawSocketChannel.getLocalAddress();
@@ -127,24 +149,8 @@ public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
         // Use the port of the udp socket
         profinetDriverContext.setLocalUdpPort(udpSocket.getPort());
 
-        // Remote connectivity attributes
-        byte[] macAddress = null;
-        try {
-            macAddress = Hex.decodeHex("000000000000");
-        } catch (DecoderException e) {
-            // Ignore this.
-        }
-        profinetDriverContext.setRemoteMacAddress(new MacAddress(macAddress));
-        final InetSocketAddress remoteAddress = (InetSocketAddress) rawSocketChannel.getRemoteAddress();
-        Inet4Address remoteIpAddress = (Inet4Address) remoteAddress.getAddress();
-        profinetDriverContext.setRemoteIpAddress(new IpAddress(remoteIpAddress.getAddress()));
-        profinetDriverContext.setRemoteUdpPort(remoteAddress.getPort());
-
-        // Generate a new session key.
-        profinetDriverContext.setSessionKey(sessionKeyGenerator.getAndIncrement());
-        // Reset the session key as soon as it reaches the max for a 16 bit uint
-        if (sessionKeyGenerator.get() == 0xFFFF) {
-            sessionKeyGenerator.set(1);
+        for (Map.Entry<MacAddress, ProfinetDevice> device : configuration.configuredDevices.entrySet()) {
+            device.getValue().onConnect();
         }
 
         ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -350,7 +356,17 @@ public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
         return future;
     }
 
-        @Override
+    @Override
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        return null;
+    }
+
+    @Override
+    public void unregister(PlcConsumerRegistration registration) {
+
+    }
+
+    @Override
     protected void decode(ConversationContext<Ethernet_Frame> context, Ethernet_Frame msg) throws Exception {
         super.decode(context, msg);
     }
@@ -556,19 +572,6 @@ public class ProfinetProtocolLogic extends Plc4xProtocolBase<Ethernet_Frame> {
         );
     }
 
-    protected static DceRpc_ActivityUuid generateActivityUuid() {
-        UUID number = UUID.randomUUID();
-        try {
-            WriteBufferByteBased wb = new WriteBufferByteBased(128);
-            wb.writeLong(64, number.getMostSignificantBits());
-            wb.writeLong(64, number.getLeastSignificantBits());
-
-            ReadBuffer rb = new ReadBufferByteBased(wb.getData());
-            return new DceRpc_ActivityUuid(rb.readLong(32), rb.readInt(16), rb.readInt(16), rb.readByteArray(8));
-        } catch (SerializationException | ParseException e) {
-            // Ignore ... this should actually never happen.
-        }
-        return null;
-    }
+
 
 }
diff --git a/protocols/profinet/src/main/resources/protocols/profinet/profinet.mspec b/protocols/profinet/src/main/resources/protocols/profinet/profinet.mspec
index 9a964a6e1..a0d8f1ecd 100644
--- a/protocols/profinet/src/main/resources/protocols/profinet/profinet.mspec
+++ b/protocols/profinet/src/main/resources/protocols/profinet/profinet.mspec
@@ -1049,6 +1049,14 @@
     ['0x8112' IOX_BLOCK_RES               ]
 ]
 
+[enum uint 16 ProfinetDeviceState
+    ['0x00'     IDLE]
+    ['0x01'     STARTUP]
+    ['0x02'     PRMEND]
+    ['0x03'     APPLRDY]
+    ['0x04'     ABORT]
+]
+
 [enum uint 16 PnIoCm_ArType
     ['0x0001' IO_CONTROLLER]
 ]