You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/08/17 13:49:07 UTC

[incubator-plc4x] branch feature/apache-kafka updated (6f8287f -> 10d51f9)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 6f8287f  Stub of kafka-connect
     add 2d4b799  Some more fine-tuning of the start-page
     add d04b40d  Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest + generified items for subscription + reordered SubscriptionRequestCyclicItem so that consumer is the last parameter. + adjusted manual test
     add 73a3f42  moved PlcRequestContainer to driver-bases
     add ec3fb9f  added toString()/equals()/hashCode() to AdsSubscriptionHandle
     add 47ff4f0  added a immutability test to plc4j-api to track immutability
     add 78853ad  - Make the S7 Driver inspect the type of S7 device it is connected to.
     new a46b4c9  Merge
     new 10d51f9  Merge

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dummydriver/connection/DummyConnection.java    |  12 +-
 .../examples/dummydriver/netty/DummyProtocol.java  |   6 +-
 .../plc4x/java/examples/plclogger/PlcLogger.java   |   2 +-
 examples/plclogger/src/main/resources/logback.xml  |   2 +-
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |   9 +-
 plc4j/api/pom.xml                                  |   6 +
 .../java/api/messages/PlcSubscriptionRequest.java  |  51 ++-
 .../api/messages/PlcUnsubscriptionRequest.java     |  57 ++++
 .../api/messages/items/SubscriptionEventItem.java  |   6 +-
 .../SubscriptionRequestChangeOfStateItem.java      |   4 +-
 .../items/SubscriptionRequestCyclicItem.java       |  10 +-
 .../items/SubscriptionRequestEventItem.java        |   6 +-
 .../messages/items/SubscriptionRequestItem.java    |   4 +-
 .../apache/plc4x/java/api/ImmutabilityTest.java    | 103 +++++++
 .../ads/connection/AdsAbstractPlcConnection.java   |   1 +
 .../java/ads/connection/AdsTcpPlcConnection.java   |   1 +
 .../java/ads/model/AdsSubscriptionHandle.java      |  25 ++
 .../plc4x/java/ads/protocol/Plc4x2AdsProtocol.java |   1 +
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  |  36 ++-
 .../connection/AdsAbstractPlcConnectionTest.java   |   1 +
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  10 +-
 .../java/ads/protocol/Plc4x2AdsProtocolTest.java   |   6 +-
 .../java/base}/messages/PlcRequestContainer.java   |  42 +--
 .../base}/messages/PlcRequestContainerTest.java    |   8 +-
 .../modbus/connection/BaseModbusPlcConnection.java |   6 +-
 .../java/modbus/netty/Plc4XModbusProtocol.java     |   1 +
 .../java/modbus/netty/Plc4XModbusProtocolTest.java |   6 +-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   6 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |   3 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java | 341 +++++++++++++++++----
 ...7ConnectedEvent.java => S7IdentifiedEvent.java} |   2 +-
 .../netty/model/params/CpuServicesParameter.java   |  25 +-
 ...meter.java => CpuServicesRequestParameter.java} |  11 +-
 .../model/params/CpuServicesResponseParameter.java |  50 +++
 .../netty/model/payloads/CpuServicesPayload.java   |  71 +++++
 .../netty/model/payloads/ssls/SslDataRecord.java}  |   7 +-
 .../ssls/SslModuleIdentificationDataRecord.java    |  66 ++++
 ...java => CpuServicesParameterFunctionGroup.java} |  27 +-
 ...a => CpuServicesParameterSubFunctionGroup.java} |  27 +-
 .../plc4x/java/s7/netty/model/types/SslId.java     |  99 ++++++
 .../plc4x/java/s7/netty/util/S7SizeHelper.java     |  20 ++
 .../java/s7/connection/S7PlcConnectionIT.java      |   3 +-
 .../java/s7/connection/S7PlcTestConnection.java    |  35 ++-
 .../plc4x/java/s7/netty/Plc4XS7ProtocolTest.java   |   2 +-
 .../s7/netty/model/params/S7ParameterTests.java    |   8 +-
 .../plc4x/java/s7/netty/util/S7SizeHelperTest.java |   5 +-
 .../s7/connection/s7-cpu-functions-response.pcap   | Bin 0 -> 219 bytes
 src/site/asciidoc/index.adoc                       |  17 +-
 48 files changed, 1061 insertions(+), 186 deletions(-)
 create mode 100644 plc4j/api/src/test/java/org/apache/plc4x/java/api/ImmutabilityTest.java
 rename plc4j/{api/src/main/java/org/apache/plc4x/java/api => protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base}/messages/PlcRequestContainer.java (61%)
 rename plc4j/{api/src/test/java/org/apache/plc4x/java/api => protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base}/messages/PlcRequestContainerTest.java (90%)
 copy plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/events/{S7ConnectedEvent.java => S7IdentifiedEvent.java} (96%)
 copy plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/{CpuServicesParameter.java => CpuServicesRequestParameter.java} (63%)
 create mode 100644 plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/CpuServicesResponseParameter.java
 create mode 100644 plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/payloads/CpuServicesPayload.java
 copy plc4j/protocols/{driver-bases/base/src/main/java/org/apache/plc4x/java/base/events/ConnectedEvent.java => s7/src/main/java/org/apache/plc4x/java/s7/netty/model/payloads/ssls/SslDataRecord.java} (85%)
 create mode 100644 plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/payloads/ssls/SslModuleIdentificationDataRecord.java
 copy plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/{DataTransportErrorCode.java => CpuServicesParameterFunctionGroup.java} (62%)
 copy plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/{DataTransportErrorCode.java => CpuServicesParameterSubFunctionGroup.java} (61%)
 create mode 100644 plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/SslId.java
 create mode 100644 plc4j/protocols/s7/src/test/resources/org/apache/plc4x/java/s7/connection/s7-cpu-functions-response.pcap


[incubator-plc4x] 02/02: Merge

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 10d51f9342c0a7be4e66df079281cfcf9accc457
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Aug 17 15:49:02 2018 +0200

    Merge
---
 .../dummydriver/connection/DummyConnection.java    |  12 +-
 .../examples/dummydriver/netty/DummyProtocol.java  |   6 +-
 .../plc4x/java/examples/plclogger/PlcLogger.java   |   2 +-
 examples/plclogger/src/main/resources/logback.xml  |   2 +-
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |   9 +-
 plc4j/api/pom.xml                                  |   6 +
 .../java/api/messages/PlcSubscriptionRequest.java  |  51 ++-
 .../api/messages/PlcUnsubscriptionRequest.java     |  57 ++++
 .../api/messages/items/SubscriptionEventItem.java  |   6 +-
 .../SubscriptionRequestChangeOfStateItem.java      |   4 +-
 .../items/SubscriptionRequestCyclicItem.java       |  10 +-
 .../items/SubscriptionRequestEventItem.java        |   6 +-
 .../messages/items/SubscriptionRequestItem.java    |   4 +-
 .../ads/connection/AdsAbstractPlcConnection.java   |   1 +
 .../java/ads/connection/AdsTcpPlcConnection.java   |   1 +
 .../java/ads/model/AdsSubscriptionHandle.java      |  25 ++
 .../plc4x/java/ads/protocol/Plc4x2AdsProtocol.java |   1 +
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  |  36 ++-
 .../connection/AdsAbstractPlcConnectionTest.java   |   1 +
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  10 +-
 .../java/ads/protocol/Plc4x2AdsProtocolTest.java   |   6 +-
 .../modbus/connection/BaseModbusPlcConnection.java |   6 +-
 .../java/modbus/netty/Plc4XModbusProtocol.java     |   1 +
 .../java/modbus/netty/Plc4XModbusProtocolTest.java |   6 +-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   6 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |   3 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java | 341 +++++++++++++++++----
 .../netty/model/params/CpuServicesParameter.java   |  25 +-
 .../plc4x/java/s7/netty/util/S7SizeHelper.java     |  20 ++
 .../java/s7/connection/S7PlcConnectionIT.java      |   3 +-
 .../java/s7/connection/S7PlcTestConnection.java    |  35 ++-
 .../plc4x/java/s7/netty/Plc4XS7ProtocolTest.java   |   2 +-
 .../s7/netty/model/params/S7ParameterTests.java    |   8 +-
 .../plc4x/java/s7/netty/util/S7SizeHelperTest.java |   5 +-
 src/site/asciidoc/index.adoc                       |  17 +-
 35 files changed, 609 insertions(+), 125 deletions(-)

diff --git a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
index abb3fea..3cbfd45 100644
--- a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
+++ b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
@@ -18,24 +18,26 @@ under the License.
 */
 package org.apache.plc4x.java.examples.dummydriver.connection;
 
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.examples.dummydriver.model.DummyAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.util.concurrent.CompletableFuture;
+
 public class DummyConnection extends AbstractPlcConnection implements PlcReader, PlcWriter {
 
     @SuppressWarnings("unused")
diff --git a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/netty/DummyProtocol.java b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/netty/DummyProtocol.java
index bf448ea..96514c7 100644
--- a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/netty/DummyProtocol.java
+++ b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/netty/DummyProtocol.java
@@ -25,7 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageCodec;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcRequest;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +36,7 @@ public class DummyProtocol extends MessageToMessageCodec<ByteBuf, PlcRequestCont
     private static final Logger logger = LoggerFactory.getLogger(DummyProtocol.class);
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, PlcRequestContainer in, List<Object> out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, PlcRequestContainer in, List<Object> out) {
         PlcRequest request = in.getRequest();
         if (request instanceof PlcReadRequest) {
 
@@ -61,7 +61,7 @@ public class DummyProtocol extends MessageToMessageCodec<ByteBuf, PlcRequestCont
     }
 
     @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
         if(logger.isTraceEnabled()) {
             logger.trace("Got Data: {}", ByteBufUtil.hexDump(in));
         }
diff --git a/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java b/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
index c24193b..ffc14ec 100644
--- a/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
+++ b/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
@@ -32,7 +32,7 @@ public class PlcLogger {
     public static void main(String[] args) throws Exception {
         if(args.length != 3) {
             System.out.println("Usage: PlcLogger {connection-string} {resource-address-string} {interval-ms}");
-            System.out.println("Example: PlcLogger s7://192.168.0.1/0/0 INPUTS/0 10");
+            System.out.println("Example: PlcLogger s7://10.10.64.20/0/0 INPUTS/0 10");
         }
 
         String connectionString = args[0];
diff --git a/examples/plclogger/src/main/resources/logback.xml b/examples/plclogger/src/main/resources/logback.xml
index bba8e02..27d40c0 100644
--- a/examples/plclogger/src/main/resources/logback.xml
+++ b/examples/plclogger/src/main/resources/logback.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="warn">
+  <root level="info">
     <appender-ref ref="STDOUT" />
   </root>
 
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 16fc626..e04e016 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -29,7 +29,10 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestCyclicItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +86,9 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     @Override
     protected void doStart() throws InterruptedException, ExecutionException, TimeoutException {
         PlcSubscriptionRequest request = new PlcSubscriptionRequest();
-        request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3));
+        @SuppressWarnings("unchecked")
+        SubscriptionRequestCyclicItem subscriptionRequestCyclicItem = new SubscriptionRequestCyclicItem(dataType, address, TimeUnit.SECONDS, 3, this);
+        request.addItem(subscriptionRequestCyclicItem);
         CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request);
         subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
     }
diff --git a/plc4j/api/pom.xml b/plc4j/api/pom.xml
index dbd5703..ac1484b 100644
--- a/plc4j/api/pom.xml
+++ b/plc4j/api/pom.xml
@@ -39,6 +39,12 @@
       <version>0.0.1-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mutabilitydetector</groupId>
+      <artifactId>MutabilityDetector</artifactId>
+      <version>0.9.6</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 5fa8ac4..67c4894 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -18,10 +18,59 @@ specific language governing permissions and limitations
 under the License.
 */
 
-import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.model.Address;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> {
 
+    public static PlcSubscriptionRequest.Builder builder() {
+        return new PlcSubscriptionRequest.Builder();
+    }
+
+    public static class Builder extends PlcRequest.Builder<SubscriptionRequestItem> {
+
+        public final <T> Builder addChangeOfStateItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+            // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestChangeOfStateItem<>(dataType, address, consumer));
+            return this;
+        }
+
+        public final <T> Builder addCyclicItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer, TimeUnit timeUnit, int period) {
+            // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestCyclicItem<>(dataType, address, timeUnit, period, consumer));
+            return this;
+        }
+
+        public final <T> Builder addEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+            // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestEventItem<>(dataType, address, consumer));
+            return this;
+        }
+
+        public final Builder addItem(SubscriptionRequestItem subscriptionRequestItem) {
+            requests.add(subscriptionRequestItem);
+            return this;
+        }
+
+        public final PlcSubscriptionRequest build() {
+            if (requests.isEmpty()) {
+                throw new IllegalStateException("No requests added");
+            }
+            PlcSubscriptionRequest plcSubscriptionRequest = new PlcSubscriptionRequest();
+            for (SubscriptionRequestItem request : requests) {
+                plcSubscriptionRequest.addItem(request);
+            }
+            return plcSubscriptionRequest;
+        }
+
+    }
+
     @Override
     public String toString() {
         return "PlcSubscriptionRequest{} " + super.toString();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index ed12177..bc46db7 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages;
 
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.SubscriptionHandle;
 
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class PlcUnsubscriptionRequest implements PlcMessage {
 
@@ -51,6 +54,60 @@ public class PlcUnsubscriptionRequest implements PlcMessage {
         return getRequestItems().size();
     }
 
+    public static PlcUnsubscriptionRequest.Builder builder() {
+        return new PlcUnsubscriptionRequest.Builder();
+    }
+
+    public static class Builder extends PlcRequest.Builder<UnsubscriptionRequestItem> {
+
+        public final Builder addHandle(SubscriptionHandle subscriptionHandle) {
+            requests.add(new UnsubscriptionRequestItem(subscriptionHandle));
+            return this;
+        }
+
+        public final Builder addHandle(SubscriptionHandle... subscriptionHandles) {
+            requests.addAll(Arrays.stream(subscriptionHandles).map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+            return this;
+        }
+
+        public final Builder addHandle(List<SubscriptionHandle> subscriptionHandles) {
+            requests.addAll(subscriptionHandles.stream().map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+            return this;
+        }
+
+        public final Builder addHandle(SubscriptionResponseItem subscriptionResponseItem) {
+            requests.add(new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+            return this;
+        }
+
+        public final Builder addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) {
+            requests.add(unsubscriptionRequestItem);
+            return this;
+        }
+
+        public final Builder addItem(UnsubscriptionRequestItem... unsubscriptionRequestItems) {
+            requests.addAll(Arrays.asList(unsubscriptionRequestItems));
+            return this;
+        }
+
+        public final Builder addItem(List<UnsubscriptionRequestItem> unsubscriptionRequestItems) {
+            requests.addAll(unsubscriptionRequestItems);
+            return this;
+        }
+
+        public final PlcUnsubscriptionRequest build() {
+            if (requests.isEmpty()) {
+                throw new IllegalStateException("No requests added");
+            }
+            PlcUnsubscriptionRequest plcUnsubscriptionRequest = new PlcUnsubscriptionRequest();
+            for (UnsubscriptionRequestItem request : requests) {
+                plcUnsubscriptionRequest.addItem(request);
+            }
+            return plcUnsubscriptionRequest;
+        }
+
+    }
+
     @Override
     public String toString() {
         return "PlcUnsubscriptionRequest{" +
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
index 60d95dc..5142d9a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
@@ -24,9 +24,9 @@ import java.util.Objects;
 
 public class SubscriptionEventItem<T> {
 
-    private SubscriptionRequestItem<T> subscriptionRequestItem;
-    private Calendar timestamp;
-    private List<T> values;
+    private final SubscriptionRequestItem<T> subscriptionRequestItem;
+    private final Calendar timestamp;
+    private final List<T> values;
 
     public SubscriptionEventItem(SubscriptionRequestItem<T> subscriptionRequestItem, Calendar timestamp, List<T> values) {
         this.subscriptionRequestItem = subscriptionRequestItem;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index 8e4b0eb..aa46b3a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -23,9 +23,9 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
 
 import java.util.function.Consumer;
 
-public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem {
+public class SubscriptionRequestChangeOfStateItem<T> extends SubscriptionRequestItem<T> {
 
-    public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) {
+    public SubscriptionRequestChangeOfStateItem(Class<T> datatype, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
         super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
     }
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index 22793b2..336d7b4 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -25,13 +25,13 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+public class SubscriptionRequestCyclicItem<T> extends SubscriptionRequestItem<T> {
 
-    private TimeUnit timeUnit;
-    private int period;
+    private final TimeUnit timeUnit;
+    private final int period;
 
-    public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) {
-        super(datatype, address, SubscriptionType.CYCLIC, consumer);
+    public SubscriptionRequestCyclicItem(Class<T> dataType, Address address, TimeUnit timeUnit, int period, Consumer<SubscriptionEventItem<T>> consumer) {
+        super(dataType, address, SubscriptionType.CYCLIC, consumer);
         this.timeUnit = timeUnit;
         this.period = period;
     }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e842d88..1abd1b4 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
 
 import java.util.function.Consumer;
 
-public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+public class SubscriptionRequestEventItem<T> extends SubscriptionRequestItem<T> {
 
-    public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) {
-        super(datatype, address, SubscriptionType.EVENT, consumer);
+    public SubscriptionRequestEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+        super(dataType, address, SubscriptionType.EVENT, consumer);
     }
 
     @Override
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
index 0667073..a38d258 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
@@ -26,8 +26,8 @@ import java.util.function.Consumer;
 
 public abstract class SubscriptionRequestItem<T> extends RequestItem<T> {
 
-    private SubscriptionType subscriptionType;
-    private Consumer<SubscriptionEventItem<T>> consumer;
+    private final SubscriptionType subscriptionType;
+    private final Consumer<SubscriptionEventItem<T>> consumer;
 
     public SubscriptionRequestItem(Class<T> datatype, Address address, SubscriptionType subscriptionType, Consumer<SubscriptionEventItem<T>> consumer) {
         super(datatype, address);
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index 125d012..9b394b8 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -40,6 +40,7 @@ import org.apache.plc4x.java.api.messages.items.RequestItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index e4458df..12fc9cf 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -46,6 +46,7 @@ import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.api.types.ResponseCode;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index 2850fa9..69742cb 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -21,6 +21,8 @@ package org.apache.plc4x.java.ads.model;
 import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
 import org.apache.plc4x.java.api.model.SubscriptionHandle;
 
+import java.util.Objects;
+
 public class AdsSubscriptionHandle implements SubscriptionHandle {
 
     private NotificationHandle notificationHandle;
@@ -33,4 +35,27 @@ public class AdsSubscriptionHandle implements SubscriptionHandle {
         return notificationHandle;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof AdsSubscriptionHandle)) {
+            return false;
+        }
+        AdsSubscriptionHandle that = (AdsSubscriptionHandle) o;
+        return Objects.equals(notificationHandle, that.notificationHandle);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(notificationHandle);
+    }
+
+    @Override
+    public String toString() {
+        return "AdsSubscriptionHandle{" +
+            "notificationHandle=" + notificationHandle +
+            '}';
+    }
 }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
index 7f0df2b..c0c0fbf 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
@@ -43,6 +43,7 @@ import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 3511e32..a0e28c8 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,15 +22,17 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
 import org.apache.plc4x.java.api.model.Address;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 public class ManualPlc4XAdsTest {
 
@@ -57,20 +59,24 @@ public class ManualPlc4XAdsTest {
             System.out.println("ResponseItem " + responseItem);
             responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println);
 
-            Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
             PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-            PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
-            subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer));
-            CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest);
-            PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
-            SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get();
 
-            PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
-            unsubscriptionRequest.addItem(
-                new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
-            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
-                plcSubscriber.unsubscribe(unsubscriptionRequest);
-            PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+            PlcSubscriptionRequest subscriptionRequest = PlcSubscriptionRequest.builder()
+                .addChangeOfStateItem(Integer.class, address, plcNotification -> System.out.println("Received notification " + plcNotification))
+                .build();
+
+            SubscriptionResponseItem subscriptionResponseItem = plcSubscriber.subscribe(subscriptionRequest)
+                .get(5, TimeUnit.SECONDS)
+                .getResponseItem().orElseThrow(() -> new RuntimeException("response not available"));
+
+            TimeUnit.SECONDS.sleep(5);
+
+            PlcUnsubscriptionRequest unsubscriptionRequest = PlcUnsubscriptionRequest.builder()
+                .addHandle(subscriptionResponseItem)
+                .build();
+
+            PlcUnsubscriptionResponse unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest)
+                .get(5, TimeUnit.SECONDS);
             System.out.println(unsubscriptionResponse);
         }
         System.exit(0);
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
index 4044858..1445363 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
@@ -39,6 +39,7 @@ import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
index 74fbca0..aa56d79 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
@@ -29,11 +29,13 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.model.AdsAddress;
 import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
+import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
 import org.apache.plc4x.java.api.messages.items.SubscriptionRequestChangeOfStateItem;
-import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
-import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,11 +52,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
-import static org.hamcrest.core.IsEqual.equalTo;
 
 public class AdsTcpPlcConnectionTests {
 
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
index ebfff15..d63fe18 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
@@ -31,9 +31,13 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.api.generic.types.Invoke;
 import org.apache.plc4x.java.ads.model.AdsAddress;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcRequest;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
 import org.apache.plc4x.java.api.messages.items.ResponseItem;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
index 6be492d..e8a671c 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
@@ -22,10 +22,14 @@ import io.netty.channel.ChannelFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.modbus.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
index 5a42e6b..00ca660 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
@@ -34,6 +34,7 @@ import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.messages.items.*;
 import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.modbus.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
index 7144720..dc60d22 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
@@ -25,12 +25,16 @@ import com.digitalpetri.modbus.responses.*;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcRequest;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
 import org.apache.plc4x.java.api.messages.items.ResponseItem;
 import org.apache.plc4x.java.api.messages.items.WriteResponseItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.base.protocol.Plc4XSupportedDataTypes;
 import org.apache.plc4x.java.modbus.model.*;
 import org.junit.Before;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index 772aa51..a9740d3 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -26,13 +26,17 @@ import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.events.ConnectEvent;
 import org.apache.plc4x.java.base.events.ConnectedEvent;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.isoontcp.netty.IsoOnTcpProtocol;
 import org.apache.plc4x.java.isotp.netty.IsoTPProtocol;
 import org.apache.plc4x.java.isotp.netty.model.tpdus.DisconnectRequestTpdu;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
index 64d66df..cc2b0f0 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
@@ -33,6 +33,7 @@ import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.api.types.ResponseCode;
 import org.apache.plc4x.java.base.PlcMessageToMessageCodec;
 import org.apache.plc4x.java.base.events.ConnectedEvent;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.s7.model.S7Address;
 import org.apache.plc4x.java.s7.model.S7BitAddress;
 import org.apache.plc4x.java.s7.model.S7DataBlockAddress;
@@ -65,7 +66,7 @@ import static org.apache.plc4x.java.s7.netty.util.S7TypeEncoder.encodeData;
  */
 public class Plc4XS7Protocol extends PlcMessageToMessageCodec<S7Message, PlcRequestContainer> {
 
-    private static final AtomicInteger tpduGenerator = new AtomicInteger(1);
+    private static final AtomicInteger tpduGenerator = new AtomicInteger(10);
 
     private Map<Short, PlcRequestContainer> requests;
 
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
index 902ab04..0195633 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
@@ -33,22 +33,26 @@ import org.apache.plc4x.java.isotp.netty.model.IsoTPMessage;
 import org.apache.plc4x.java.isotp.netty.model.tpdus.DataTpdu;
 import org.apache.plc4x.java.s7.netty.events.S7ConnectedEvent;
 import org.apache.plc4x.java.s7.netty.model.messages.*;
-import org.apache.plc4x.java.s7.netty.model.params.VarParameter;
-import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
-import org.apache.plc4x.java.s7.netty.model.params.SetupCommunicationParameter;
+import org.apache.plc4x.java.s7.netty.model.params.*;
 import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
 import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.CpuServicesPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
 import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslDataRecord;
+import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslModuleIdentificationDataRecord;
 import org.apache.plc4x.java.s7.netty.model.types.*;
 import org.apache.plc4x.java.s7.netty.strategies.DefaultS7MessageProcessor;
 import org.apache.plc4x.java.s7.netty.strategies.S7MessageProcessor;
 import org.apache.plc4x.java.s7.netty.util.S7SizeHelper;
+import org.apache.plc4x.java.s7.types.S7ControllerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.util.*;
 
 /**
@@ -83,6 +87,7 @@ public class S7Protocol extends ChannelDuplexHandler {
     private short maxAmqCaller;
     private short maxAmqCallee;
     private short pduSize;
+    private S7ControllerType controllerType;
 
     // For detecting the lower layers.
     private ChannelHandler prevChannelHandler;
@@ -206,19 +211,63 @@ public class S7Protocol extends ChannelDuplexHandler {
 
     private void encodePayloads(S7Message in, ByteBuf buf) {
         for (S7Payload payload : in.getPayloads()) {
-            ParameterType parameterType = payload.getType();
-
-            // When sending requests currently only write var has payloads.
-            if (parameterType == ParameterType.WRITE_VAR) {
-                VarPayload varPayload = (VarPayload) payload;
-                for (VarPayloadItem payloadItem : varPayload.getItems()) {
-                    buf.writeByte(payloadItem.getReturnCode().getCode());
-                    buf.writeByte(payloadItem.getDataTransportSize().getCode());
-                    // TODO: Check if this is correct?!?! Might be problems with sizeInBits = true/false
-                    buf.writeShort(payloadItem.getData().length);
-                    buf.writeBytes(payloadItem.getData());
-                    // TODO: It looks as if BIT type reads require a 0x00 fill byte at the end ...
-                }
+            switch (payload.getType()) {
+
+                case WRITE_VAR:
+                    VarPayload varPayload = (VarPayload) payload;
+                    for (VarPayloadItem payloadItem : varPayload.getItems()) {
+                        buf.writeByte(payloadItem.getReturnCode().getCode());
+                        buf.writeByte(payloadItem.getDataTransportSize().getCode());
+                        // TODO: Check if this is correct?!?! Might be problems with sizeInBits = true/false
+                        buf.writeShort(payloadItem.getData().length);
+                        buf.writeBytes(payloadItem.getData());
+                        // TODO: It looks as if BIT type reads require a 0x00 fill byte at the end ...
+                    }
+                    break;
+
+                case CPU_SERVICES:
+                    CpuServicesPayload cpuServicesPayload = (CpuServicesPayload) payload;
+                    buf.writeByte(cpuServicesPayload.getReturnCode().getCode());
+                    // This seems to be constantly set to this.
+                    buf.writeByte(DataTransportSize.OCTET_STRING.getCode());
+
+                    // A request payload is simple.
+                    if (cpuServicesPayload.getSslDataRecords().isEmpty()) {
+                        buf.writeShort(4);
+                        buf.writeShort(cpuServicesPayload.getSslId().getCode());
+                        buf.writeShort(cpuServicesPayload.getSslIndex());
+                    }
+                    // The response payload contains a lot more information.
+                    else {
+                        short length = 8;
+                        short sizeOfDataItem = 0;
+                        for (SslDataRecord sslDataRecord : cpuServicesPayload.getSslDataRecords()) {
+                            sizeOfDataItem = (short) (sslDataRecord.getLengthInWords() * (short) 2);
+                            length += sizeOfDataItem;
+                        }
+                        buf.writeShort(length);
+                        buf.writeShort(cpuServicesPayload.getSslId().getCode());
+                        buf.writeShort(cpuServicesPayload.getSslIndex());
+                        buf.writeShort(sizeOfDataItem);
+                        buf.writeShort(cpuServicesPayload.getSslDataRecords().size());
+                        // Output any sort of ssl list items, if there are any.
+                        for (SslDataRecord sslDataRecord : cpuServicesPayload.getSslDataRecords()) {
+                            if(sslDataRecord instanceof SslModuleIdentificationDataRecord) {
+                                SslModuleIdentificationDataRecord midr = (SslModuleIdentificationDataRecord) sslDataRecord;
+                                buf.writeShort(midr.getIndex());
+                                byte[] articleNumberBytes = midr.getArticleNumber().getBytes(StandardCharsets.UTF_8);
+                                // An array full of 20 spaces.
+                                byte[] data = new byte[]{0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20,
+                                    0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20};
+                                // Copy max 20 bytes from the article number into the dest array.
+                                System.arraycopy(articleNumberBytes, 0, data, 0, 20);
+                                buf.writeBytes(data);
+                                buf.writeShort(midr.getModuleOrOsVersion());
+                                buf.writeShort(midr.getPgDescriptionFileVersion());
+                            }
+                        }
+                    }
+                    break;
             }
         }
     }
@@ -234,6 +283,9 @@ public class S7Protocol extends ChannelDuplexHandler {
                 case SETUP_COMMUNICATION:
                     encodeParameterSetupCommunication(buf, (SetupCommunicationParameter) s7Parameter);
                     break;
+                case CPU_SERVICES:
+                    encodeCpuServicesParameter(buf, (CpuServicesParameter) s7Parameter);
+                    break;
                 default:
                     logger.error("writing this parameter type not implemented");
             }
@@ -280,6 +332,30 @@ public class S7Protocol extends ChannelDuplexHandler {
         }
     }
 
+    private void encodeCpuServicesParameter(ByteBuf buf, CpuServicesParameter parameter) {
+        // Output the header for a CPU Services parameter.
+        buf.writeByte(0x01);
+        buf.writeByte(0x12);
+        // Length of the parameter.
+        buf.writeByte((parameter instanceof CpuServicesRequestParameter) ? 0x04 : 0x08);
+        // Is this a request or a response?
+        buf.writeByte((parameter instanceof CpuServicesRequestParameter) ? 0x11 : 0x12);
+        // This is a mixture of request/response and function group .
+        byte nextByte = (byte) (((parameter instanceof CpuServicesRequestParameter) ?
+            (byte) 0x40 : (byte) 0x80) | parameter.getFunctionGroup().getCode());
+        buf.writeByte(nextByte);
+        buf.writeByte(parameter.getSubFunctionGroup().getCode());
+        buf.writeByte(parameter.getSequenceNumber());
+
+        // A response parameter has some more fields.
+        if((parameter instanceof CpuServicesResponseParameter)) {
+            CpuServicesResponseParameter responseParameter = (CpuServicesResponseParameter) parameter;
+            buf.writeByte(responseParameter.getDataUnitReferenceNumber());
+            buf.writeByte(responseParameter.isLastDataUnit() ? 0x00 : 0x01);
+            buf.writeShort(responseParameter.getError().getCode());
+        }
+    }
+
     private void encodeS7AnyParameterItem(ByteBuf buf, S7AnyVarParameterItem s7AnyRequestItem) {
         buf.writeByte(s7AnyRequestItem.getSpecificationType().getCode());
         // Length of this item (excluding spec type and length)
@@ -356,7 +432,7 @@ public class S7Protocol extends ChannelDuplexHandler {
             i += S7SizeHelper.getParameterLength(parameter);
         }
 
-        List<S7Payload> s7Payloads = decodePayloads(userData, isResponse, userDataLength, readWriteVarParameter);
+        List<S7Payload> s7Payloads = decodePayloads(userData, isResponse, userDataLength, s7Parameters);
 
         logger.debug("S7 Message with id {} received", tpduReference);
 
@@ -387,13 +463,35 @@ public class S7Protocol extends ChannelDuplexHandler {
 
                 if(responseMessage != null) {
                     out.add(responseMessage);
+
+                    // If this is a USER_DATA packet the probability is high that this is
+                    // a response to the identification request, we have to handle that.
+                    if(responseMessage.getMessageType() == MessageType.USER_DATA) {
+                        for (S7Payload payload : responseMessage.getPayloads()) {
+                            if(payload instanceof CpuServicesPayload) {
+                                handleIdentifyRemote(ctx, (CpuServicesPayload) payload);
+                            }
+                        }
+                    }
                 }
 
                 // Eventually send the next message (if there is one).
                 trySendingMessages(ctx);
             }
+
         } else {
-            // TODO: Find out if there is any situation in which a request is sent from the PLC
+            // CpuService responses are encoded as requests.
+            for (S7Parameter s7Parameter : s7Parameters) {
+                // Only if we have a response parameter, the payload is a response payload.
+                if(s7Parameter instanceof CpuServicesResponseParameter) {
+                    for (S7Payload s7Payload : s7Payloads) {
+                        if(s7Payload instanceof CpuServicesPayload) {
+                            CpuServicesPayload cpuServicesPayload = (CpuServicesPayload) s7Payload;
+                            handleIdentifyRemote(ctx, cpuServicesPayload);
+                        }
+                    }
+                }
+            }
             out.add(new S7RequestMessage(messageType, tpduReference, s7Parameters, s7Payloads, null));
         }
     }
@@ -406,46 +504,126 @@ public class S7Protocol extends ChannelDuplexHandler {
         logger.info("S7Connection established pdu-size {}, max-amq-caller {}, " +
                 "max-amq-callee {}", pduSize, maxAmqCaller, maxAmqCallee);
 
-        // Send an event that setup is complete.
-        ctx.channel().pipeline().fireUserEventTriggered(new S7ConnectedEvent());
+        // Prepare a message to request the remote to identify itself.
+        S7RequestMessage identifyRemoteMessage = new S7RequestMessage(MessageType.USER_DATA, (short) 2,
+            Collections.singletonList(new CpuServicesRequestParameter(
+                CpuServicesParameterFunctionGroup.CPU_FUNCTIONS,
+                CpuServicesParameterSubFunctionGroup.READ_SSL, (byte) 0)),
+            Collections.singletonList(new CpuServicesPayload(DataTransportErrorCode.OK, SslId.MODULE_IDENTIFICATION,
+                (short) 0x0000)), null);
+        ctx.channel().writeAndFlush(identifyRemoteMessage);
     }
 
-    private List<S7Payload> decodePayloads(ByteBuf userData, boolean isResponse, short userDataLength, VarParameter readWriteVarParameter) {
-        int i = 0;
-        List<S7Payload> s7Payloads = new LinkedList<>();
-        if (readWriteVarParameter != null) {
-            List<VarPayloadItem> payloadItems = new LinkedList<>();
-
-            // Just keep on reading payloads until the provided length is read.
-            while (i < userDataLength) {
-                DataTransportErrorCode dataTransportErrorCode = DataTransportErrorCode.valueOf(userData.readByte());
-                // This is a response to a WRITE_VAR request (It only contains the return code for every sent item.
-                if ((readWriteVarParameter.getType() == ParameterType.WRITE_VAR) && isResponse) {
-                    // Initialize a rudimentary payload (This is updated in the Plc4XS7Protocol class
-                    VarPayloadItem payload = new VarPayloadItem(dataTransportErrorCode, null, null);
-                    payloadItems.add(payload);
-                    i += 1;
-                }
-                // This is a response to a READ_VAR request.
-                else if ((readWriteVarParameter.getType() == ParameterType.READ_VAR) && isResponse) {
-                    DataTransportSize dataTransportSize = DataTransportSize.valueOf(userData.readByte());
-                    short length = (dataTransportSize.isSizeInBits()) ?
-                        (short) Math.ceil(userData.readShort() / 8.0) : userData.readShort();
-                    byte[] data = new byte[length];
-                    userData.readBytes(data);
-                    // Initialize a rudimentary payload (This is updated in the Plc4XS7Protocol class
-                    VarPayloadItem payload = new VarPayloadItem(dataTransportErrorCode, dataTransportSize, data);
-                    payloadItems.add(payload);
-                    i += S7SizeHelper.getPayloadLength(payload);
+    private void handleIdentifyRemote(ChannelHandlerContext ctx, CpuServicesPayload cpuServicesPayload) {
+        controllerType = S7ControllerType.S7_ANY;
+        for (SslDataRecord sslDataRecord : cpuServicesPayload.getSslDataRecords()) {
+            if(sslDataRecord instanceof SslModuleIdentificationDataRecord) {
+                SslModuleIdentificationDataRecord sslModuleIdentificationDataRecord =
+                    (SslModuleIdentificationDataRecord) sslDataRecord;
+                if(sslModuleIdentificationDataRecord.getIndex() == (short) 0x0001) {
+                    controllerType = lookupControllerType(sslModuleIdentificationDataRecord.getArticleNumber());
                 }
             }
+        }
+        logger.info("Successfully connected to S7: " + controllerType.name());
+        logger.info("- max amq caller: " + maxAmqCaller);
+        logger.info("- max amq callee: " + maxAmqCallee);
+        logger.info("- pdu size: " + pduSize);
 
-            VarPayload varPayload = new VarPayload(readWriteVarParameter.getType(), payloadItems);
-            s7Payloads.add(varPayload);
+        // Send an event that connection setup is complete.
+        ctx.channel().pipeline().fireUserEventTriggered(new S7ConnectedEvent());
+    }
+
+    private List<S7Payload> decodePayloads(ByteBuf userData, boolean isResponse, short userDataLength, List<S7Parameter> s7Parameters) {
+        List<S7Payload> s7Payloads = new LinkedList<>();
+        for (S7Parameter s7Parameter : s7Parameters) {
+            if(s7Parameter instanceof VarParameter) {
+                VarParameter readWriteVarParameter = (VarParameter) s7Parameter;
+                VarPayload varPayload = decodeVarPayload(userData, isResponse, userDataLength, readWriteVarParameter);
+                s7Payloads.add(varPayload);
+            } else if(s7Parameter instanceof CpuServicesParameter) {
+                CpuServicesParameter cpuServicesParameter = (CpuServicesParameter) s7Parameter;
+                CpuServicesPayload cpuServicesPayload = decodeCpuServicesPayload(userData, isResponse, userDataLength,
+                    cpuServicesParameter);
+                s7Payloads.add(cpuServicesPayload);
+            }
         }
         return s7Payloads;
     }
 
+    private VarPayload decodeVarPayload(ByteBuf userData, boolean isResponse, short userDataLength,
+                                        VarParameter readWriteVarParameter) {
+        List<VarPayloadItem> payloadItems = new LinkedList<>();
+
+        // Just keep on reading payloads until the provided length is read.
+        int i = 0;
+        while (i < userDataLength) {
+            DataTransportErrorCode dataTransportErrorCode = DataTransportErrorCode.valueOf(userData.readByte());
+            // This is a response to a WRITE_VAR request (It only contains the return code for every sent item.
+            if ((readWriteVarParameter.getType() == ParameterType.WRITE_VAR) && isResponse) {
+                // Initialize a rudimentary payload (This is updated in the Plc4XS7Protocol class
+                VarPayloadItem payload = new VarPayloadItem(dataTransportErrorCode, null, null);
+                payloadItems.add(payload);
+                i += 1;
+            }
+            // This is a response to a READ_VAR request.
+            else if ((readWriteVarParameter.getType() == ParameterType.READ_VAR) && isResponse) {
+                DataTransportSize dataTransportSize = DataTransportSize.valueOf(userData.readByte());
+                short length = (dataTransportSize.isSizeInBits()) ?
+                    (short) Math.ceil(userData.readShort() / 8.0) : userData.readShort();
+                byte[] data = new byte[length];
+                userData.readBytes(data);
+                // Initialize a rudimentary payload (This is updated in the Plc4XS7Protocol class
+                VarPayloadItem payload = new VarPayloadItem(dataTransportErrorCode, dataTransportSize, data);
+                payloadItems.add(payload);
+                i += S7SizeHelper.getPayloadLength(payload);
+            }
+        }
+
+        return new VarPayload(readWriteVarParameter.getType(), payloadItems);
+    }
+
+    private CpuServicesPayload decodeCpuServicesPayload(ByteBuf userData, boolean isResponse, short userDataLength,
+                                                        CpuServicesParameter cpuServicesParameter) {
+        DataTransportErrorCode returnCode = DataTransportErrorCode.valueOf(userData.readByte());
+        DataTransportSize dataTransportSize = DataTransportSize.valueOf(userData.readByte());
+        if(dataTransportSize != DataTransportSize.OCTET_STRING) {
+            // TODO: Output an error.
+        }
+        short length = userData.readShort();
+        SslId sslId = SslId.valueOf(userData.readShort());
+        short sslIndex = userData.readShort();
+        // If the length is 4 there is no `partial list length in bytes` and `partial list count` parameters.
+        if(length == 4) {
+            return new CpuServicesPayload(returnCode, sslId, sslIndex);
+        }
+        // If the length is not 4, then it has to be at least 8.
+        else if(length >= 8) {
+            // TODO: We should probably ensure we don't read more than this.
+            short partialListLengthInWords = userData.readShort();
+            short partialListCount = userData.readShort();
+            List<SslDataRecord> sslDataRecords = new LinkedList<>();
+            for(int i = 0; i < partialListCount; i++) {
+                short index = userData.readShort();
+                byte[] articleNumberBytes = new byte[20];
+                userData.readBytes(articleNumberBytes);
+                String articleNumber = null;
+                articleNumber = new String(articleNumberBytes, StandardCharsets.UTF_8).trim();
+                short bgType = userData.readShort();
+                short moduleOrOsVersion = userData.readShort();
+                short pgDescriptionFileVersion = userData.readShort();
+                sslDataRecords.add(new SslModuleIdentificationDataRecord(
+                    index, articleNumber, bgType, moduleOrOsVersion, pgDescriptionFileVersion));
+            }
+            return new CpuServicesPayload(returnCode, sslId, sslIndex, sslDataRecords);
+        }
+        // In all other cases, it's probably an error.
+        else {
+            // TODO: Output an error.
+        }
+        return null;
+    }
+
     private S7Parameter decodeParameter(ByteBuf in, boolean isResponse, int restLength) {
         ParameterType parameterType = ParameterType.valueOf(in.readByte());
         if (parameterType == null) {
@@ -454,12 +632,7 @@ public class S7Protocol extends ChannelDuplexHandler {
         }
         switch (parameterType) {
             case CPU_SERVICES:
-                // Just read in the rest of the header as content of this parameter.
-                // Will have to do a lot more investigation on how this parameter is
-                // constructed.
-                byte[] cpuServices = new byte[restLength - 1];
-                in.readBytes(cpuServices);
-                return null;
+                return decodeCpuServicesParameter(in);
             case READ_VAR:
             case WRITE_VAR:
                 List<VarParameterItem> varParamameter;
@@ -485,6 +658,43 @@ public class S7Protocol extends ChannelDuplexHandler {
         return null;
     }
 
+    private CpuServicesParameter decodeCpuServicesParameter(ByteBuf in) {
+        if(in.readShort() != 0x0112) {
+            if (logger.isErrorEnabled()) {
+                logger.error("Expecting 0x0112 for CPU_SERVICES parameter");
+            }
+            return null;
+        }
+        byte parameterLength = in.readByte();
+        if((parameterLength != 4) && (parameterLength != 8)) {
+            if (logger.isErrorEnabled()) {
+                logger.error("Parameter length should be 4 or 8, but was {}", parameterLength);
+            }
+            return null;
+        }
+        // Skipping this as it sort of contains redundant information.
+        in.readByte();
+        byte typeAndFunctionGroup = in.readByte();
+        // If bit 7 is set, it's a request (if bit 8 is set it's a response).
+        boolean requestParameter = (typeAndFunctionGroup & 0x64) != 0;
+        // The last 4 bits contain the function group value.
+        typeAndFunctionGroup = (byte) (typeAndFunctionGroup & 0xF);
+        CpuServicesParameterFunctionGroup functionGroup =
+            CpuServicesParameterFunctionGroup.valueOf(typeAndFunctionGroup);
+        CpuServicesParameterSubFunctionGroup subFunctionGroup =
+            CpuServicesParameterSubFunctionGroup.valueOf(in.readByte());
+        byte sequenceNumber = in.readByte();
+        if(!requestParameter) {
+            return new CpuServicesRequestParameter(functionGroup, subFunctionGroup, sequenceNumber);
+        } else {
+            byte dataUnitReferenceNumber = in.readByte();
+            boolean lastDataUnit = (in.readByte() == 0x00);
+            ParameterError error = ParameterError.valueOf(in.readShort());
+            return new CpuServicesResponseParameter(functionGroup, subFunctionGroup, sequenceNumber,
+                dataUnitReferenceNumber, lastDataUnit, error);
+        }
+    }
+
     private List<VarParameterItem> decodeReadWriteVarParameter(ByteBuf in, byte numItems) {
         List<VarParameterItem> items = new LinkedList<>();
         for (int i = 0; i < numItems; i++) {
@@ -565,4 +775,25 @@ public class S7Protocol extends ChannelDuplexHandler {
         ctx.flush();
     }
 
+    private S7ControllerType lookupControllerType(String articleNumber) {
+        if(!articleNumber.startsWith("6ES7 ")) {
+            return S7ControllerType.S7_ANY;
+        }
+
+        String model = articleNumber.substring(articleNumber.indexOf(" ") + 1, articleNumber.indexOf(" ") + 2);
+        switch (model) {
+            case "2":
+                return S7ControllerType.S7_1200;
+            case "5":
+                return S7ControllerType.S7_1500;
+            case "3":
+                return S7ControllerType.S7_300;
+            case "4":
+                return S7ControllerType.S7_400;
+            default:
+                logger.info("Looking up unknown article number " + articleNumber);
+                return S7ControllerType.S7_ANY;
+        }
+    }
+
 }
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/CpuServicesParameter.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/CpuServicesParameter.java
index 7914179..e87ea4e 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/CpuServicesParameter.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/params/CpuServicesParameter.java
@@ -18,14 +18,37 @@ under the License.
 */
 package org.apache.plc4x.java.s7.netty.model.params;
 
+import org.apache.plc4x.java.s7.netty.model.types.CpuServicesParameterSubFunctionGroup;
+import org.apache.plc4x.java.s7.netty.model.types.CpuServicesParameterFunctionGroup;
 import org.apache.plc4x.java.s7.netty.model.types.ParameterType;
 
-public class CpuServicesParameter implements S7Parameter {
+public abstract class CpuServicesParameter implements S7Parameter {
+
+    private CpuServicesParameterFunctionGroup functionGroup;
+    private CpuServicesParameterSubFunctionGroup subFunctionGroup;
+    private byte sequenceNumber;
+
+    public CpuServicesParameter(CpuServicesParameterFunctionGroup functionGroup, CpuServicesParameterSubFunctionGroup subFunctionGroup, byte sequenceNumber) {
+        this.functionGroup = functionGroup;
+        this.subFunctionGroup = subFunctionGroup;
+        this.sequenceNumber = sequenceNumber;
+    }
 
     @Override
     public ParameterType getType() {
         return ParameterType.CPU_SERVICES;
     }
 
+    public CpuServicesParameterFunctionGroup getFunctionGroup() {
+        return functionGroup;
+    }
+
+    public CpuServicesParameterSubFunctionGroup getSubFunctionGroup() {
+        return subFunctionGroup;
+    }
+
+    public byte getSequenceNumber() {
+        return sequenceNumber;
+    }
 
 }
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelper.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelper.java
index ea18692..afd4951 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelper.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelper.java
@@ -18,13 +18,16 @@ specific language governing permissions and limitations
 under the License.
 */
 
+import org.apache.plc4x.java.s7.netty.model.params.CpuServicesRequestParameter;
 import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
 import org.apache.plc4x.java.s7.netty.model.params.VarParameter;
 import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
 import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.CpuServicesPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
 import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslDataRecord;
 import org.apache.plc4x.java.s7.netty.model.types.VariableAddressingMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +64,17 @@ public class S7SizeHelper {
                 for (VarPayloadItem payloadItem : varPayload.getItems()) {
                     l += getPayloadLength(payloadItem);
                 }
+            } else if(payload instanceof CpuServicesPayload) {
+                CpuServicesPayload cpuServicesPayload = (CpuServicesPayload) payload;
+                if(cpuServicesPayload.getSslDataRecords().isEmpty()) {
+                    return 8;
+                } else {
+                    short length = 0;
+                    for (SslDataRecord sslDataRecord : cpuServicesPayload.getSslDataRecords()) {
+                        length += sslDataRecord.getLengthInWords() * 2;
+                    }
+                    return length;
+                }
             }
         }
         return l;
@@ -77,6 +91,12 @@ public class S7SizeHelper {
                 return getReadWriteVarParameterLength((VarParameter) parameter);
             case SETUP_COMMUNICATION:
                 return 8;
+            case CPU_SERVICES:
+                if(parameter instanceof CpuServicesRequestParameter) {
+                    return 8;
+                } else {
+                    return 12;
+                }
             default:
                 logger.error("Not implemented");
                 return 0;
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcConnectionIT.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcConnectionIT.java
index 0843f5b..cdbcea0 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcConnectionIT.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcConnectionIT.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.s7.connection;
 
 import io.netty.channel.Channel;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.s7.types.S7ControllerType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -46,7 +47,7 @@ public class S7PlcConnectionIT {
     @Before
     public void setUp() {
         try {
-            s7PlcConnection = new S7PlcTestConnection(1, 2, "");
+            s7PlcConnection = new S7PlcTestConnection(1, 2, "", S7ControllerType.S7_1500);
             s7PlcConnection.connect();
             channel = s7PlcConnection.getChannel();
         } catch (PlcConnectionException e) {
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
index 80b7da7..44cff0a 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.plc4x.java.base.connection.TestChannelFactory;
 import org.apache.plc4x.java.base.events.ConnectEvent;
+import org.apache.plc4x.java.s7.types.S7ControllerType;
 
 import java.io.File;
 import java.io.IOException;
@@ -32,8 +33,11 @@ import java.io.InputStream;
 
 public class S7PlcTestConnection extends S7PlcConnection {
 
-    public S7PlcTestConnection(int rack, int slot, String params) {
+    private S7ControllerType controllerType;
+
+    public S7PlcTestConnection(int rack, int slot, String params, S7ControllerType controllerType) {
         super(new TestChannelFactory(), rack, slot, params);
+        this.controllerType = controllerType;
     }
 
     /*
@@ -88,6 +92,35 @@ public class S7PlcTestConnection extends S7PlcConnection {
         byte[] setupCommunicationResponse = readPcapFile(
             "org/apache/plc4x/java/s7/connection/s7-setup-communication-response.pcap");
         channel.writeInbound(Unpooled.wrappedBuffer(setupCommunicationResponse));
+
+        // Read a S7 CPU Functions request.
+        writtenData = channel.readOutbound();
+        byte[] cpuFunctionsRequest = new byte[writtenData.readableBytes()];
+        writtenData.readBytes(cpuFunctionsRequest);
+        // TODO: Check the content of the S7 Setup Communication connection request.
+
+        // Send an S7 CPU Functions response back to the pipeline.
+        byte[] cpuFunctionsResponse = readPcapFile(
+            "org/apache/plc4x/java/s7/connection/s7-cpu-functions-response.pcap");
+        // Override the type of reported S7 device.
+        switch (controllerType) {
+            case S7_1200:
+                cpuFunctionsResponse[48] = '2';
+                break;
+            case S7_1500:
+                cpuFunctionsResponse[48] = '5';
+                break;
+            case S7_300:
+                cpuFunctionsResponse[48] = '3';
+                break;
+            case S7_400:
+                cpuFunctionsResponse[48] = '4';
+                break;
+            default:
+                cpuFunctionsResponse[48] = '1';
+                break;
+        }
+        channel.writeInbound(Unpooled.wrappedBuffer(cpuFunctionsResponse));
     }
 
     public static byte[] toByteArray(int[] in) {
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/Plc4XS7ProtocolTest.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/Plc4XS7ProtocolTest.java
index f1e5eb7..4ab29c6 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/Plc4XS7ProtocolTest.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/Plc4XS7ProtocolTest.java
@@ -20,10 +20,10 @@ package org.apache.plc4x.java.s7.netty;
 
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcRequest;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
 import org.apache.plc4x.java.netty.NettyTestBase;
 import org.apache.plc4x.java.s7.model.S7Address;
 import org.apache.plc4x.java.s7.model.S7BitAddress;
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/model/params/S7ParameterTests.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/model/params/S7ParameterTests.java
index faaf627..9dd3c10 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/model/params/S7ParameterTests.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/model/params/S7ParameterTests.java
@@ -21,10 +21,7 @@ package org.apache.plc4x.java.s7.netty.model.params;
 
 import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
 import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
-import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
-import org.apache.plc4x.java.s7.netty.model.types.ParameterType;
-import org.apache.plc4x.java.s7.netty.model.types.SpecificationType;
-import org.apache.plc4x.java.s7.netty.model.types.TransportSize;
+import org.apache.plc4x.java.s7.netty.model.types.*;
 import org.apache.plc4x.test.FastTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -60,7 +57,8 @@ public class S7ParameterTests {
     @Test
     @Category(FastTests.class)
     public void cpuServicesParameter() {
-        CpuServicesParameter cpuParameter = new CpuServicesParameter();
+        CpuServicesParameter cpuParameter = new CpuServicesRequestParameter(
+            CpuServicesParameterFunctionGroup.CPU_FUNCTIONS, CpuServicesParameterSubFunctionGroup.READ_SSL, (byte) 0);
         assertThat("Unexpected parameter type", cpuParameter.getType(), equalTo(ParameterType.CPU_SERVICES));
     }
     
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelperTest.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelperTest.java
index fa831c7..40e9562 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelperTest.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/util/S7SizeHelperTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.core.Is.is;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 
-import org.apache.plc4x.java.s7.netty.model.params.CpuServicesParameter;
+import org.apache.plc4x.java.s7.netty.model.params.CpuServicesRequestParameter;
 import org.apache.plc4x.java.s7.netty.model.params.SetupCommunicationParameter;
 import org.apache.plc4x.java.s7.netty.model.params.VarParameter;
 import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
@@ -60,7 +60,8 @@ public class S7SizeHelperTest {
     @Test
     public void getParameterLengthTest() {
         assertThat(S7SizeHelper.getParameterLength(
-            new CpuServicesParameter()), is((short) 0));
+            new CpuServicesRequestParameter(CpuServicesParameterFunctionGroup.CPU_FUNCTIONS,
+                CpuServicesParameterSubFunctionGroup.READ_SSL, (byte) 0)), is((short) 8));
         assertThat(S7SizeHelper.getParameterLength(
             new SetupCommunicationParameter((short) 8, (short)8, (short)250)), is((short) 8));
         assertThat(S7SizeHelper.getParameterLength(
diff --git a/src/site/asciidoc/index.adoc b/src/site/asciidoc/index.adoc
index 31da326..d68a0d1 100644
--- a/src/site/asciidoc/index.adoc
+++ b/src/site/asciidoc/index.adoc
@@ -17,10 +17,13 @@
 :imagesdir: img/
 :icons: font
 
-== PLC4X: Just like OPC-UA, but totally different
+== PLC4X: Universal Protocol Adapter for Industrial IoT
 
 image::iot-lab.jpg[width=480, float=right]
 
+[.lead]
+Just like OPC-UA, but totally different.
+
 We are currently probably experiencing the greatest advances in the way we process information in human history.
 
 While these advances are taking over more and more parts of our world, it seems one large segment has been continuously been missing out all the fun.
@@ -53,14 +56,14 @@ At first we will be concentrating on providing adapters for the most widely used
 - icon:wrench[role=yellow] link:protocpls/delta-v/index.html[Emerson DeltaV (UDP)]
 - icon:wrench[role=yellow] link:protocols/ethernet-ip/index.html[EtherNet/IP (TCP)]
 - icon:check[role=green] link:protocols/modbus/index.html[Modbus (TCP)]
-- icon:times[role=red] link:protocols/opc-ua/index.html[OPC-UA (TCP)]
-- icon:check[role=green] link:protocols/s7/index.html[Siemens S7 (TCP)]
+- icon:pause-circle[role=light-gray] link:protocols/opc-ua/index.html[OPC-UA]
+- icon:check[role=green] link:protocols/s7/index.html[S7 (TCP)]
 
 We are planning on providing support for the following programming languages:
 
 - icon:check[role=green] link:plc4j/index.html[Java]
-- icon:times[role=red] link:plc4s/index.html[Scala]
-- icon:times[role=red] link:plc4c/index.html[C/C++]
+- icon:pause-circle[role=light-gray] link:plc4s/index.html[Scala]
+- icon:pause-circle[role=light-gray] link:plc4c/index.html[C/C++]
 
 Beyond implementing the pure adapters we have already implemented some of the planned integration modules to popular projects in the Apache IoT world such as:
 
@@ -69,8 +72,8 @@ Beyond implementing the pure adapters we have already implemented some of the pl
 a|image::apache_edgent_logo.png[width=80%,link=https://edgent.apache.org] a|image::apache_camel_logo.png[width=80%,link=https://camel.apache.org] a|image::apache_kafka_logo.png[width=80%,link=https://kafka.apache.org]
 |icon:check[role=green] https://edgent.apache.org[Apache Edgent (Incubating)] |icon:check[role=green] https://camel.apache.org[Apache Camel] |icon:wrench[role=yellow] https://kafka.apache.org[Apache Kafka]
 
-a|image::apache_mynewt_logo.png[width=80%,link=https://mynewt.apache.org] a|image::apache_nifi_logo.svg[width=80%,link=https://nifi.apache.org] a|image::apache_brooklyn_logo.png[width=80%,link=https://brooklyn.apache.org]
-|icon:times[role=red] https://mynewt.apache.org[Apache Mynewt] |icon:times[role=red] https://nifi.apache.org[Apache Nifi] |icon:times[role=red] https://brooklyn.apache.org[Apache Brooklyn]
+a|image::apache_nifi_logo.svg[width=80%,link=https://nifi.apache.org] a|image::apache_brooklyn_logo.png[width=80%,link=https://brooklyn.apache.org] a|image::apache_mynewt_logo.png[width=80%,link=https://mynewt.apache.org]
+|icon:pause-circle[role=light-gray] https://nifi.apache.org[Apache Nifi] |icon:pause-circle[role=light-gray] https://brooklyn.apache.org[Apache Brooklyn] |icon:pause-circle[role=light-gray] https://mynewt.apache.org[Apache Mynewt]
 |===
 
 Hereby greatly reducing the barriers and the learning curve for creating industrial IoT applications.
\ No newline at end of file


[incubator-plc4x] 01/02: Merge

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit a46b4c952a9fc42fe7c0c081b7b3af1704f9bc80
Merge: 6f8287f 78853ad
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Aug 17 15:47:56 2018 +0200

    Merge

 .../apache/plc4x/java/api/ImmutabilityTest.java    | 103 +++++++++++++++++++++
 .../java/base}/messages/PlcRequestContainer.java   |  42 +++++----
 .../base}/messages/PlcRequestContainerTest.java    |   8 +-
 .../java/s7/netty/events/S7IdentifiedEvent.java    |  22 +++++
 .../model/params/CpuServicesRequestParameter.java  |  30 ++++++
 .../model/params/CpuServicesResponseParameter.java |  50 ++++++++++
 .../netty/model/payloads/CpuServicesPayload.java   |  71 ++++++++++++++
 .../netty/model/payloads/ssls/SslDataRecord.java   |  25 +++++
 .../ssls/SslModuleIdentificationDataRecord.java    |  66 +++++++++++++
 .../types/CpuServicesParameterFunctionGroup.java   |  59 ++++++++++++
 .../CpuServicesParameterSubFunctionGroup.java      |  59 ++++++++++++
 .../plc4x/java/s7/netty/model/types/SslId.java     |  99 ++++++++++++++++++++
 .../s7/connection/s7-cpu-functions-response.pcap   | Bin 0 -> 219 bytes
 13 files changed, 612 insertions(+), 22 deletions(-)