You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/01 20:17:52 UTC

[incubator-plc4x] 22/35: - Added more tests - Fixed the exception handling in the S7Protocol

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch add-simple-mock-driver
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit b9240715a681f2693e0a7e4092e4f5156d33c761
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Thu Nov 1 10:50:14 2018 +0100

    - Added more tests
    - Fixed the exception handling in the S7Protocol
---
 .../plc4x/java/isotp/netty/IsoTPProtocol.java      |  12 +-
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   4 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java | 113 +++---
 .../java/s7/connection/S7PlcTestConnection.java    |   4 +
 .../apache/plc4x/java/s7/netty/S7ProtocolTest.java | 434 ++++++++++++++++++---
 plc4j/protocols/s7/src/test/resources/logback.xml  |   2 +-
 6 files changed, 460 insertions(+), 109 deletions(-)

diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/IsoTPProtocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/IsoTPProtocol.java
index f0dfe52..a6bdfad 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/IsoTPProtocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/IsoTPProtocol.java
@@ -90,7 +90,7 @@ public class IsoTPProtocol extends PlcMessageToMessageCodec<IsoOnTcpMessage, Tpd
 
     @Override
     protected void encode(ChannelHandlerContext ctx, Tpdu in, List<Object> out) {
-        logger.debug("ISO Transport Protocol Message sent");
+        logger.trace("ISO Transport Protocol Message sent");
 
         if (in == null) {
             return;
@@ -209,15 +209,15 @@ public class IsoTPProtocol extends PlcMessageToMessageCodec<IsoOnTcpMessage, Tpd
 
     @Override
     protected void decode(ChannelHandlerContext ctx, IsoOnTcpMessage in, List<Object> out) {
-        if (logger.isTraceEnabled()) {
-            logger.trace("Got Data: {}", ByteBufUtil.hexDump(in.getUserData()));
-        }
-        logger.debug("ISO TP Message received");
-
+        logger.trace("ISO TP Message received");
         if (in == null) {
             return;
         }
 
+        if (logger.isDebugEnabled()) {
+            logger.debug("Got Data: {}", ByteBufUtil.hexDump(in.getUserData()));
+        }
+
         ByteBuf userData = in.getUserData();
         if (userData.writerIndex() < 1) {
             return;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index 2f121de..fb8951b 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -42,6 +42,7 @@ import org.apache.plc4x.java.isotp.netty.model.types.TpduSize;
 import org.apache.plc4x.java.s7.netty.Plc4XS7Protocol;
 import org.apache.plc4x.java.s7.netty.S7Protocol;
 import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
+import org.apache.plc4x.java.s7.netty.strategies.DefaultS7MessageProcessor;
 import org.apache.plc4x.java.s7.netty.util.S7PlcFieldHandler;
 import org.apache.plc4x.java.s7.utils.S7TsapIdEncoder;
 import org.slf4j.Logger;
@@ -171,7 +172,8 @@ public class S7PlcConnection extends AbstractPlcConnection implements PlcReader,
                 });
                 pipeline.addLast(new IsoOnTcpProtocol());
                 pipeline.addLast(new IsoTPProtocol(callingTsapId, calledTsapId, paramPduSize));
-                pipeline.addLast(new S7Protocol(paramMaxAmqCaller, paramMaxAmqCallee, (short) paramPduSize.getValue()));
+                pipeline.addLast(new S7Protocol(paramMaxAmqCaller, paramMaxAmqCallee, (short) paramPduSize.getValue(),
+                    new DefaultS7MessageProcessor()));
                 pipeline.addLast(new Plc4XS7Protocol());
             }
         };
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
index f60bf57..606c1b6 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
@@ -26,6 +26,7 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolPayloadTooBigException;
 import org.apache.plc4x.java.isotp.netty.IsoTPProtocol;
 import org.apache.plc4x.java.isotp.netty.events.IsoTPConnectedEvent;
@@ -46,7 +47,6 @@ import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
 import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslDataRecord;
 import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslModuleIdentificationDataRecord;
 import org.apache.plc4x.java.s7.netty.model.types.*;
-import org.apache.plc4x.java.s7.netty.strategies.DefaultS7MessageProcessor;
 import org.apache.plc4x.java.s7.netty.strategies.S7MessageProcessor;
 import org.apache.plc4x.java.s7.netty.util.S7SizeHelper;
 import org.apache.plc4x.java.s7.types.S7ControllerType;
@@ -99,12 +99,12 @@ public class S7Protocol extends ChannelDuplexHandler {
     private PendingWriteQueue queue;
     private Map<Short, DataTpdu> sentButUnacknowledgedTpdus;
 
-    public S7Protocol(short requestedMaxAmqCaller, short requestedMaxAmqCallee, short requestedPduSize) {
+    public S7Protocol(short requestedMaxAmqCaller, short requestedMaxAmqCallee, short requestedPduSize, S7MessageProcessor messageProcessor) {
         this.maxAmqCaller = requestedMaxAmqCaller;
         this.maxAmqCallee = requestedMaxAmqCallee;
         this.pduSize = requestedPduSize;
+        this.messageProcessor = messageProcessor;
         sentButUnacknowledgedTpdus = new HashMap<>();
-        messageProcessor = new DefaultS7MessageProcessor();
     }
 
     @Override
@@ -163,49 +163,51 @@ public class S7Protocol extends ChannelDuplexHandler {
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
     @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-        if(msg instanceof S7Message) {
-            S7Message in = (S7Message) msg;
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+        try {
+            if(msg instanceof S7Message) {
+                S7Message in = (S7Message) msg;
 
-            // Give message processors to process the incoming message.
-            Collection<? extends S7Message> messages;
-            if((messageProcessor != null) && (in instanceof S7RequestMessage)) {
-                try {
+                // Give message processors to process the incoming message.
+                Collection<? extends S7Message> messages;
+                if ((messageProcessor != null) && (in instanceof S7RequestMessage)) {
                     messages = messageProcessor.processRequest((S7RequestMessage) in, pduSize);
-                } catch(Exception e) {
-                    logger.error("Error processing message", e);
-                    ctx.fireExceptionCaught(e);
-                    return;
+                } else {
+                    messages = Collections.singleton(in);
                 }
-            } else {
-                messages = Collections.singleton(in);
-            }
 
-            // Create a promise that has to be called multiple times.
-            PromiseCombiner promiseCombiner = new PromiseCombiner();
-            if(messages != null) {
+                // Create a promise that has to be called multiple times.
+                PromiseCombiner promiseCombiner = new PromiseCombiner();
                 for (S7Message message : messages) {
                     ByteBuf buf = Unpooled.buffer();
-                    writeS7Message(ctx, promise.channel(), promiseCombiner, message, buf);
+                    writeS7Message(promise.channel(), promiseCombiner, message, buf);
                 }
-            }
-            promiseCombiner.finish(promise);
+                promiseCombiner.finish(promise);
 
-            // Start sending the queue content.
-            trySendingMessages(ctx);
-        } else {
-            super.write(ctx, msg, promise);
+                // Start sending the queue content.
+                trySendingMessages(ctx);
+            }
+            // Especially during the phase of connection establishment, we might be sending
+            // messages of a lower level protocol, so if it's not S7, we forward it to the next
+            // in the pipeline and hope it can handle it. If no layer can handle it Netty will
+            // exceptionally complete the future.
+            else {
+                ctx.write(msg, promise);
+            }
+        } catch (Exception e) {
+            promise.setFailure(e);
         }
     }
 
-    private void writeS7Message(ChannelHandlerContext ctx, Channel channel, PromiseCombiner promiseCombiner, S7Message message, ByteBuf buf) {
+    private void writeS7Message(Channel channel, PromiseCombiner promiseCombiner,
+                                S7Message message, ByteBuf buf) throws PlcProtocolException {
         encodeHeader(message, buf);
         encodeParameters(message, buf);
         encodePayloads(message, buf);
 
         // Check if the message doesn't exceed the negotiated maximum size.
         if (buf.writerIndex() > pduSize) {
-            ctx.fireExceptionCaught(new PlcProtocolPayloadTooBigException("s7", pduSize, buf.writerIndex(), message));
+            throw new PlcProtocolPayloadTooBigException("s7", pduSize, buf.writerIndex(), message);
         } else {
             ChannelPromise subPromise = new DefaultChannelPromise(channel);
             queue.add(new DataTpdu(true, (byte) 0x01, Collections.emptyList(), buf, message), subPromise);
@@ -214,17 +216,20 @@ public class S7Protocol extends ChannelDuplexHandler {
         }
     }
 
-    private void encodePayloads(S7Message in, ByteBuf buf) {
-        for (S7Payload payload : in.getPayloads()) {
-            switch (payload.getType()) {
-                case WRITE_VAR:
-                    encodeWriteVarPayload((VarPayload) payload, buf);
-                    break;
-                case CPU_SERVICES:
-                    encodeCpuServicesPayload((CpuServicesPayload) payload, buf);
-                    break;
-                default:
-                    break;
+    private void encodePayloads(S7Message in, ByteBuf buf) throws PlcProtocolException {
+        if(in.getPayloads() != null) {
+            for (S7Payload payload : in.getPayloads()) {
+                switch (payload.getType()) {
+                    case WRITE_VAR:
+                        encodeWriteVarPayload((VarPayload) payload, buf);
+                        break;
+                    case CPU_SERVICES:
+                        encodeCpuServicesPayload((CpuServicesPayload) payload, buf);
+                        break;
+                    default:
+                        throw new PlcProtocolException("Writing payloads of type " +
+                            payload.getType().name() + " not implemented.");
+                }
             }
         }
     }
@@ -240,7 +245,8 @@ public class S7Protocol extends ChannelDuplexHandler {
         }
     }
 
-    private void encodeCpuServicesPayload(CpuServicesPayload cpuServicesPayload, ByteBuf buf) {
+    private void encodeCpuServicesPayload(CpuServicesPayload cpuServicesPayload, ByteBuf buf)
+            throws PlcProtocolException {
         buf.writeByte(cpuServicesPayload.getReturnCode().getCode());
         // This seems to be constantly set to this.
         buf.writeByte(DataTransportSize.OCTET_STRING.getCode());
@@ -253,7 +259,8 @@ public class S7Protocol extends ChannelDuplexHandler {
         }
         // The response payload contains a lot more information.
         else {
-            short length = 8;
+            throw new PlcProtocolException("Unexpected SZL Data Records");
+            /*short length = 8;
             short sizeOfDataItem = 0;
             for (SslDataRecord sslDataRecord : cpuServicesPayload.getSslDataRecords()) {
                 sizeOfDataItem = (short) (sslDataRecord.getLengthInWords() * (short) 2);
@@ -279,11 +286,11 @@ public class S7Protocol extends ChannelDuplexHandler {
                     buf.writeShort(midr.getModuleOrOsVersion());
                     buf.writeShort(midr.getPgDescriptionFileVersion());
                 }
-            }
+            }*/
         }
     }
 
-    private void encodeParameters(S7Message in, ByteBuf buf) {
+    private void encodeParameters(S7Message in, ByteBuf buf) throws PlcProtocolException {
         for (S7Parameter s7Parameter : in.getParameters()) {
             buf.writeByte(s7Parameter.getType().getCode());
             switch (s7Parameter.getType()) {
@@ -298,7 +305,8 @@ public class S7Protocol extends ChannelDuplexHandler {
                     encodeCpuServicesParameter(buf, (CpuServicesParameter) s7Parameter);
                     break;
                 default:
-                    logger.error("writing this parameter type not implemented");
+                    throw new PlcProtocolException("Writing parameters of type " +
+                        s7Parameter.getType().name() + " not implemented.");
             }
         }
     }
@@ -314,11 +322,12 @@ public class S7Protocol extends ChannelDuplexHandler {
         buf.writeShort(S7SizeHelper.getParametersLength(in.getParameters()));
         // Data field length
         buf.writeShort(S7SizeHelper.getPayloadsLength(in.getPayloads()));
-        if (in instanceof S7ResponseMessage) {
+        // Not sure why this is implemented, we should never be sending out responses.
+        /*if (in instanceof S7ResponseMessage) {
             S7ResponseMessage s7ResponseMessage = (S7ResponseMessage) in;
             buf.writeByte(s7ResponseMessage.getErrorClass());
             buf.writeByte(s7ResponseMessage.getErrorCode());
-        }
+        }*/
     }
 
     private void encodeParameterSetupCommunication(ByteBuf buf, SetupCommunicationParameter s7Parameter) {
@@ -329,7 +338,7 @@ public class S7Protocol extends ChannelDuplexHandler {
         buf.writeShort(s7Parameter.getPduLength());
     }
 
-    private void encodeParameterReadWriteVar(ByteBuf buf, VarParameter s7Parameter) {
+    private void encodeParameterReadWriteVar(ByteBuf buf, VarParameter s7Parameter) throws PlcProtocolException {
         List<VarParameterItem> items = s7Parameter.getItems();
         // PlcReadRequestItem count (Read one variable at a time)
         buf.writeByte((byte) items.size());
@@ -338,7 +347,8 @@ public class S7Protocol extends ChannelDuplexHandler {
             if (addressMode == VariableAddressingMode.S7ANY) {
                 encodeS7AnyParameterItem(buf, (S7AnyVarParameterItem) item);
             } else {
-                logger.error("writing this item type not implemented");
+                throw new PlcProtocolException("Writing VarParameterItems with addressing mode " +
+                    addressMode.name() + " not implemented");
             }
         }
     }
@@ -359,12 +369,13 @@ public class S7Protocol extends ChannelDuplexHandler {
         buf.writeByte(parameter.getSequenceNumber());
 
         // A response parameter has some more fields.
-        if(parameter instanceof CpuServicesResponseParameter) {
+        // Not sure why this is implemented, we should never be sending out responses.
+        /*if(parameter instanceof CpuServicesResponseParameter) {
             CpuServicesResponseParameter responseParameter = (CpuServicesResponseParameter) parameter;
             buf.writeByte(responseParameter.getDataUnitReferenceNumber());
             buf.writeByte(responseParameter.isLastDataUnit() ? 0x00 : 0x01);
             buf.writeShort(responseParameter.getError().getCode());
-        }
+        }*/
     }
 
     private void encodeS7AnyParameterItem(ByteBuf buf, S7AnyVarParameterItem s7AnyRequestItem) {
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
index 8a69334..70873b4 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/connection/S7PlcTestConnection.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.commons.io.IOUtils;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.base.connection.TestChannelFactory;
 import org.apache.plc4x.java.s7.types.S7ControllerType;
 import org.pcap4j.core.NotOpenException;
@@ -83,6 +84,9 @@ public class S7PlcTestConnection extends S7PlcConnection {
         super.sendChannelCreatedEvent();
 
         ByteBuf writtenData = channel.readOutbound();
+        if(writtenData == null) {
+            throw new PlcRuntimeException("Error reading initial channel output");
+        }
         byte[] connectionRequest = new byte[writtenData.readableBytes()];
         writtenData.readBytes(connectionRequest);
         // TODO: Check the content of the Iso TP connection request.
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/S7ProtocolTest.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/S7ProtocolTest.java
index 868b67b..7c8b416 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/S7ProtocolTest.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/S7ProtocolTest.java
@@ -18,73 +18,407 @@ under the License.
 */
 package org.apache.plc4x.java.s7.netty;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.plc4x.java.isotp.netty.model.IsoTPMessage;
-import org.apache.plc4x.java.isotp.netty.model.tpdus.Tpdu;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.isotp.netty.model.tpdus.DataTpdu;
 import org.apache.plc4x.java.netty.NettyTestBase;
-import org.apache.plc4x.java.s7.netty.model.types.MessageType;
-import org.apache.plc4x.test.FastTests;
+import org.apache.plc4x.java.s7.netty.model.messages.S7RequestMessage;
+import org.apache.plc4x.java.s7.netty.model.messages.SetupCommunicationRequestMessage;
+import org.apache.plc4x.java.s7.netty.model.params.CpuServicesRequestParameter;
+import org.apache.plc4x.java.s7.netty.model.params.VarParameter;
+import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
+import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.CpuServicesPayload;
+import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
+import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
+import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.ssls.SslModuleIdentificationDataRecord;
+import org.apache.plc4x.java.s7.netty.model.types.*;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.Collections;
 
-import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class S7ProtocolTest extends NettyTestBase {
 
-    private S7Protocol SUT;
+    private EmbeddedChannel SUT;
 
     @Before
     public void setUp() {
-        SUT = new S7Protocol((short) 1, (short) 1, (short) 256);
+        SUT = new EmbeddedChannel(new S7Protocol((short) 10, (short) 10, (short) 50, null));
     }
 
-   /* @Test
-    @Category(FastTests.class)
-    public void encode() throws Exception {
-        //TODO: finish me
-        LinkedList<Object> out = new LinkedList<>();
-        // TODO: Disabled for now ... have to reactivate it when I'm finished with PLC4X-29
-        SUT.write(null, new S7RequestMessage(
-            MessageType.ACK,
-            (short) 1,
-            singletonList(new VarParameter(ParameterType.WRITE_VAR, singletonList(new S7AnyVarParameterItem(
-                SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS, TransportSize.BOOL, (short) 1, (short) 1, (short) 1, (byte) 1
-            )))),
-            singletonList(new VarPayload(
-                ParameterType.WRITE_VAR,
-                singletonList(new VarPayloadItem(
-                    DataTransportErrorCode.OK,
-                    DataTransportSize.BYTE_WORD_DWORD, new byte[]{0})
-                ))
-            ), null), new DefaultChannelPromise(new EmbeddedChannel()));
-        assertThat(out, hasSize(1));
-    }*/
+    /**
+     * When not receiving S7Message objects, an exception should be thrown.
+     */
+    @Test
+    public void testWriteJunk() {
+        ChannelFuture channelFuture = SUT.writeOneOutbound("Hurz");
+        Object outbound = SUT.readOutbound();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+    }
+
+    @Test
+    public void testSetupCommunication() {
+        SUT.writeOneOutbound(
+            new SetupCommunicationRequestMessage((short) 0x03, (short) 0x04, (short) 0x05, (short) 0x06, null)
+        );
+        Object outbound = SUT.readOutbound();
+
+        assertThat("The protocol layer should have output something", outbound, notNullValue());
+        assertThat("The output should have been of type DataTpdu", outbound, instanceOf(DataTpdu.class));
+
+        DataTpdu dataTpdu = (DataTpdu) outbound;
+        assertThat("The DataTpdu shouldn't have any parameters", dataTpdu.getParameters().isEmpty(), equalTo(true));
+
+        byte[] actUserData = new byte[dataTpdu.getUserData().readableBytes()];
+        dataTpdu.getUserData().readBytes(actUserData);
+
+        byte[] refUserData = toByteArray(new int[] {
+            // Protocol Id: 0x32 => S7Comm
+            0x32,
+            // MessageType.JOB
+            0x01,
+            0x00, 0x00,
+            // Pdu Reference = 3
+            0x00, 0x03,
+            // Parameter Length = 8
+            0x00, 0x08,
+            // Payload Length = 0
+            0x00, 0x00,
+            // ParameterType.SETUP_COMMUNICATION
+            0xf0,
+            // Reserved
+            0x00,
+            // Max AMQ Calling = 0x04
+            0x00, 0x04,
+            // Max AMQ Callee = 0x05
+            0x00, 0x05,
+            // PLU Size = 0x06
+            0x00, 0x06});
+
+        assertThat("Output generated by the current layer doesn't match the expected output",
+            Arrays.equals(actUserData, refUserData), equalTo(true));
+    }
+
+    @Test
+    public void testCpuServices() {
+        SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.USER_DATA, (short) 2,
+                Collections.singletonList(new CpuServicesRequestParameter(
+                    CpuServicesParameterFunctionGroup.CPU_FUNCTIONS,
+                    CpuServicesParameterSubFunctionGroup.READ_SSL, (byte) 0)),
+                Collections.singletonList(new CpuServicesPayload(DataTransportErrorCode.OK, SslId.MODULE_IDENTIFICATION,
+                    (short) 0x0000)), null));
+        Object outbound = SUT.readOutbound();
+
+        assertThat("The protocol layer should have output something", outbound, notNullValue());
+        assertThat("The output should have been of type DataTpdu", outbound, instanceOf(DataTpdu.class));
+
+        DataTpdu dataTpdu = (DataTpdu) outbound;
+        assertThat("The DataTpdu shouldn't have any parameters", dataTpdu.getParameters().isEmpty(), equalTo(true));
+
+        byte[] actUserData = new byte[dataTpdu.getUserData().readableBytes()];
+        dataTpdu.getUserData().readBytes(actUserData);
+        outputArray(actUserData);
+        byte[] refUserData = toByteArray(new int[] {
+            // Protocol Id: 0x32 => S7Comm
+            0x32,
+            // MessageType.USER_DATA
+            0x07,
+            // Reserved
+            0x00, 0x00,
+            // Pdu Reference = 2
+            0x00, 0x02,
+            // Parameter Length = 8
+            0x00, 0x08,
+            // Payload Length = 8
+            0x00, 0x08,
+            // ParameterType.CPU_SERVICES
+            0x00,
+            // ???
+            0x01, 0x12,
+            // Parameter Length
+            0x04,
+            // Type Request
+            0x11,
+            // Type: Request = 0x4 & Subtype: CPU functions = 0x4
+            0x44,
+            // Sub-function: Read SZL
+            0x01,
+            // Sequence Number: 0x00
+            0x00,
+            //////// Payload
+            // Return code: Success
+            0xFF,
+            // Transport Size
+            0x09,
+            // Length
+            0x00, 0x04,
+            // SZL Id
+            0x00, 0x11,
+            // SZL Index
+            0x00, 0x00
+        });
+
+        assertThat("Output generated by the current layer doesn't match the expected output",
+            Arrays.equals(actUserData, refUserData), equalTo(true));
+    }
+
+    @Test
+    public void testReadVar() {
+        SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1, Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR, Collections.singletonList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0)))),
+                null, null));
+        Object outbound = SUT.readOutbound();
+
+        assertThat("The protocol layer should have output something", outbound, notNullValue());
+        assertThat("The output should have been of type DataTpdu", outbound, instanceOf(DataTpdu.class));
+
+        DataTpdu dataTpdu = (DataTpdu) outbound;
+        assertThat("The DataTpdu shouldn't have any parameters", dataTpdu.getParameters().isEmpty(), equalTo(true));
+
+        byte[] actUserData = new byte[dataTpdu.getUserData().readableBytes()];
+        dataTpdu.getUserData().readBytes(actUserData);
+//        outputArray(actUserData);
+        byte[] refUserData = toByteArray(new int[] {
+            // Protocol Id: 0x32 => S7Comm
+            0x32,
+            // MessageType.JOB
+            0x01,
+            0x00, 0x00,
+            // Pdu Reference = 1
+            0x00, 0x01,
+            // Parameter Length = 14
+            0x00, 0x0e,
+            // Payload Length = 0
+            0x00, 0x00,
+            // ParameterType.READ_VAR
+            0x04,
+            // Number of items = 1
+            0x01,
+                // SpecificationType.VARIABLE_SPECIFICATION
+                0x12,
+                // Variable specification length = 10
+                0x0a,
+                // S7Any type of item
+                0x10,
+                // TransportSize.BYTE = 0x02
+                0x02,
+                // Number of items = 1
+                0x00, 0x01,
+                // DB Number = 2
+                0x00, 0x02,
+                // MemoryArea.DATA_BLOCKS = 0x84
+                0x84,
+                // Address: 00000000 00000000 00010000
+                //          -----------------------...
+                //          byte address           bit
+                0x00, 0x00, 0x18});
+
+        assertThat("Output generated by the current layer doesn't match the expected output",
+            Arrays.equals(actUserData, refUserData), equalTo(true));
+    }
+
+    @Test
+    public void testWriteVar() {
+        SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1, Collections.singletonList(
+                new VarParameter(ParameterType.WRITE_VAR, Collections.singletonList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0)))),
+                Collections.singletonList(
+                    new VarPayload(ParameterType.WRITE_VAR, Collections.singletonList(
+                        new VarPayloadItem(DataTransportErrorCode.OK, DataTransportSize.BYTE_WORD_DWORD, new byte[]{(byte) 0x0A})))
+                ), null));
+        Object outbound = SUT.readOutbound();
+
+        assertThat("The protocol layer should have output something", outbound, notNullValue());
+        assertThat("The output should have been of type DataTpdu", outbound, instanceOf(DataTpdu.class));
+
+        DataTpdu dataTpdu = (DataTpdu) outbound;
+        assertThat("The DataTpdu shouldn't have any parameters", dataTpdu.getParameters().isEmpty(), equalTo(true));
+
+        byte[] actUserData = new byte[dataTpdu.getUserData().readableBytes()];
+        dataTpdu.getUserData().readBytes(actUserData);
+
+        byte[] refUserData = toByteArray(new int[] {
+            // Protocol Id: 0x32 => S7Comm
+            0x32,
+            // MessageType.JOB
+            0x01,
+            0x00, 0x00,
+            // Pdu Reference = 1
+            0x00, 0x01,
+            // Parameter Length = 14
+            0x00, 0x0e,
+            // Payload Length = 5
+            0x00, 0x05,
+            // ParameterType.WRITE_VAR
+            0x05,
+            // Number of items = 1
+            0x01,
+            /////////////////////////////////////////////////////
+            // Parameters ...
+            /////////////////////////////////////////////////////
+            // SpecificationType.VARIABLE_SPECIFICATION
+            0x12,
+            // Variable specification length = 10
+            0x0a,
+            // S7Any type of item
+            0x10,
+            // TransportSize.BYTE = 0x02
+            0x02,
+            // Number of items = 1
+            0x00, 0x01,
+            // DB Number = 2
+            0x00, 0x02,
+            // MemoryArea.DATA_BLOCKS = 0x84
+            0x84,
+            // Address: 00000000 00000000 00010000
+            //          -----------------------...
+            //          byte address           bit
+            0x00, 0x00, 0x18,
+            /////////////////////////////////////////////////////
+            // Payloads
+            /////////////////////////////////////////////////////
+            // DataTransportErrorCode.OK
+            0xff,
+            // DataTransportSize.BYTE_WORD_DWOR
+            0x04,
+            // Length = 1
+            0x00, 0x01,
+            // Data: 0x0A
+            0x0a
+        });
+
+        assertThat("Output generated by the current layer doesn't match the expected output",
+            Arrays.equals(actUserData, refUserData), equalTo(true));
+    }
+
+    @Test
+    public void testTooBigTpdu() {
+        ChannelFuture channelFuture = SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1, Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR, Arrays.asList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0),
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0),
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0),
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0),
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, 1, (short) 2, (short) 3, (byte) 0)))),
+                null, null));
+        Object outbound = SUT.readOutbound();
+        Throwable exception = channelFuture.cause();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, notNullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, instanceOf(PlcProtocolException.class));
+    }
+
+    @Test
+    public void testNotImplementedParameter() {
+        ChannelFuture channelFuture = SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1, Collections.singletonList(
+                new VarParameter(ParameterType.UPLOAD, Collections.emptyList())),
+                null, null));
+        Object outbound = SUT.readOutbound();
+        Throwable exception = channelFuture.cause();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, notNullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, instanceOf(PlcProtocolException.class));
+    }
 
     @Test
-    @Category(FastTests.class)
-    public void decode() {
-        //TODO: finish me
-        LinkedList<Object> out = new LinkedList<>();
-        ByteBuf buffer = Unpooled.buffer();
-        // Magic Number
-        buffer.writeByte(0x32);
-        buffer.writeByte(MessageType.JOB.getCode());
-        // Reserved magic value
-        buffer.writeShort(0x0000);
-        // tpduReference
-        buffer.writeShort(0x0000);
-        // headerParametersLength
-        buffer.writeShort(0x0000);
-        // userDataLength
-        buffer.writeShort(0x0000);
-        SUT.decode(null, new IsoTPMessage(mock(Tpdu.class), buffer), out);
-        assertThat(out, hasSize(1));
+    public void testNotImplementedPayload() {
+        S7Payload payload  = mock(S7Payload.class);
+        when(payload.getType()).thenReturn(ParameterType.UPLOAD);
+        ChannelFuture channelFuture = SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1,
+                Collections.emptyList(),
+                Collections.singletonList(payload),
+                null));
+        Object outbound = SUT.readOutbound();
+        Throwable exception = channelFuture.cause();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, notNullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, instanceOf(PlcProtocolException.class));
+    }
+
+    @Test
+    public void testNotImplementedAddressingType() {
+        VarParameterItem varParameterItem = mock(VarParameterItem.class);
+        when(varParameterItem.getAddressingMode()).thenReturn(VariableAddressingMode.ALARM_QUERYREQ);
+        ChannelFuture channelFuture = SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.JOB, (short) 1, Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR, Collections.singletonList(varParameterItem))),
+                null, null));
+        Object outbound = SUT.readOutbound();
+        Throwable exception = channelFuture.cause();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, notNullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, instanceOf(PlcProtocolException.class));
+    }
+
+    @Test
+    public void cpuServicesRequestWithSzlRecords() {
+        ChannelFuture channelFuture = SUT.writeOneOutbound(
+            new S7RequestMessage(MessageType.USER_DATA, (short) 2,
+                Collections.singletonList(new CpuServicesRequestParameter(
+                    CpuServicesParameterFunctionGroup.CPU_FUNCTIONS,
+                    CpuServicesParameterSubFunctionGroup.READ_SSL, (byte) 0)),
+                Collections.singletonList(new CpuServicesPayload(DataTransportErrorCode.OK, SslId.MODULE_IDENTIFICATION,
+                    (short) 0x0000, Collections.singletonList(new SslModuleIdentificationDataRecord((short) 0, "hurz", (short) 0, (short) 0, (short) 0)))),
+                null));
+        Object outbound = SUT.readOutbound();
+        Throwable exception = channelFuture.cause();
+
+        assertThat("The protocol layer should not have output anything", outbound, nullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, notNullValue());
+        assertThat("The protocol layer should have thrown an exception", exception, instanceOf(PlcProtocolException.class));
+    }
+
+    private static byte[] toByteArray(int[] input) {
+        byte[] output = new byte[input.length];
+        for (int i = 0; i < input.length; i++) {
+            if((input[i] > 0xFF) || (input[i] < 0)) {
+                throw new IllegalArgumentException("Int values passed in to 'toByteArray' should be vaild byte values.");
+            }
+            output[i] = (byte) input[i];
+        }
+        return output;
+    }
+
+    private static void outputArray(byte[] input) {
+        StringBuilder sb = new StringBuilder();
+        for (byte anInput : input) {
+            if (sb.length() > 0) {
+                sb.append(", ");
+            }
+            sb.append(String.format("0x%02x", anInput));
+        }
+        System.out.println(sb.toString());
     }
 
 }
diff --git a/plc4j/protocols/s7/src/test/resources/logback.xml b/plc4j/protocols/s7/src/test/resources/logback.xml
index bba8e02..a8ddebb 100644
--- a/plc4j/protocols/s7/src/test/resources/logback.xml
+++ b/plc4j/protocols/s7/src/test/resources/logback.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="warn">
+  <root level="trace">
     <appender-ref ref="STDOUT" />
   </root>