You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/02/25 22:37:29 UTC

[plc4x] 04/04: - Worked on getting the test-channel to actually work - Implemented a first somewhat working version of the driver-testsuite execution code

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/driver-testsuite
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 4b651798775ee36ae38194b8e1e4615b14c21a02
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Feb 25 23:37:14 2020 +0100

    - Worked on getting the test-channel to actually work
    - Implemented a first somewhat working version of the driver-testsuite execution code
---
 .../io/netty/bootstrap/EventLoopProvider.java}     |  16 +-
 .../java/io/netty/bootstrap/Plc4xBootstrap.java    |  66 ++
 .../spi/connection/ChannelExposingConnection.java} |  16 +-
 .../spi/connection/DefaultNettyPlcConnection.java  |   3 +-
 .../java/spi/connection/NettyChannelFactory.java   |  10 +-
 plc4j/transports/test/pom.xml                      |   4 +
 .../channel/embedded/Plc4xEmbeddedChannel.java     | 892 +++++++++++++++++++++
 .../channel/embedded/Plc4xEmbeddedEventLoop.java   | 147 ++++
 .../java/transport/test/TestChannelFactory.java    |  11 +-
 .../services/org.apache.plc4x.java.api.PlcDriver   |  19 -
 plc4j/utils/test-utils/pom.xml                     |  32 +
 .../plc4x/test/driver/DriverTestsuiteRunner.java   | 198 ++++-
 .../plc4x/test/driver/model/DriverTestsuite.java   |  19 +-
 .../driver/model/{TestStep.java => StepType.java}  |  32 +-
 .../apache/plc4x/test/driver/model/TestStep.java   |   8 +-
 .../main/resources/schemas/driver-testsuite.xsd    |  33 +-
 16 files changed, 1405 insertions(+), 101 deletions(-)

diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
similarity index 73%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
index 92e281c..1e2877b 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
@@ -16,20 +16,12 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.test.driver.model;
+package io.netty.bootstrap;
 
-import org.dom4j.Element;
+import io.netty.channel.EventLoop;
 
-public class TestStep {
+public interface EventLoopProvider {
 
-    private final Element payload;
-
-    public TestStep(Element payload) {
-        this.payload = payload;
-    }
-
-    public Element getPayload() {
-        return payload;
-    }
+    EventLoop getEventLoop();
 
 }
diff --git a/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java b/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java
new file mode 100644
index 0000000..133f2cb
--- /dev/null
+++ b/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package io.netty.bootstrap;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+
+import java.net.SocketAddress;
+
+public class Plc4xBootstrap extends Bootstrap {
+
+    @Override
+    public Bootstrap validate() {
+        if(channelFactory() != null) {
+             if(channelFactory().toString().contains("EmbeddedChannel")) {
+                 if (config().handler() == null) {
+                     throw new IllegalStateException("handler not set");
+                 }
+             } else {
+                 return super.validate();
+             }
+        } else {
+            return super.validate();
+        }
+        return this;
+    }
+
+    @Override
+    void init(Channel channel) throws Exception {
+        if((group == null) && (channel instanceof EventLoopProvider)) {
+            group = ((EventLoopProvider) channel).getEventLoop();
+        }
+        super.init(channel);
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+        final ChannelFuture connectFuture = super.connect(remoteAddress);
+
+        if("Plc4xEmbeddedEventLoop".equals(group.getClass().getSimpleName())) {
+            if (connectFuture instanceof ChannelPromise) {
+                ChannelPromise connectPromise = (ChannelPromise) connectFuture;
+                connectPromise.setSuccess();
+            }
+        }
+        return connectFuture;
+    }
+
+}
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
similarity index 73%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
index 92e281c..a9517c2 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
@@ -16,20 +16,12 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.test.driver.model;
+package org.apache.plc4x.java.spi.connection;
 
-import org.dom4j.Element;
+import io.netty.channel.Channel;
 
-public class TestStep {
+public interface ChannelExposingConnection {
 
-    private final Element payload;
-
-    public TestStep(Element payload) {
-        this.payload = payload;
-    }
-
-    public Element getPayload() {
-        return payload;
-    }
+    Channel getChannel();
 
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index 05e1b70..855bae9 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-public class DefaultNettyPlcConnection extends AbstractPlcConnection {
+public class DefaultNettyPlcConnection extends AbstractPlcConnection implements ChannelExposingConnection {
 
     /**
      * a {@link HashedWheelTimer} shall be only instantiated once.
@@ -145,6 +145,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection {
         return connected && channel.isActive();
     }
 
+    @Override
     public Channel getChannel() {
         return channel;
     }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
index 2d227c1..8bfc55a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
@@ -20,14 +20,13 @@
 package org.apache.plc4x.java.spi.connection;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.Plc4xBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.spi.configuration.ConfigurationFactory;
-import org.apache.plc4x.java.spi.configuration.HasConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,10 +85,13 @@ public abstract class NettyChannelFactory implements ChannelFactory {
     @Override
     public Channel createChannel(ChannelHandler channelHandler) throws PlcConnectionException {
         try {
+            Bootstrap bootstrap = new Plc4xBootstrap();
+
             final EventLoopGroup workerGroup = getEventLoopGroup();
+            if(workerGroup != null) {
+                bootstrap.group(workerGroup);
+            }
 
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(workerGroup);
             bootstrap.channel(getChannel());
             // Callback to allow subclasses to modify the Bootstrap
 
diff --git a/plc4j/transports/test/pom.xml b/plc4j/transports/test/pom.xml
index cac99e6..0b3b767 100644
--- a/plc4j/transports/test/pom.xml
+++ b/plc4j/transports/test/pom.xml
@@ -40,6 +40,10 @@
 
     <dependency>
       <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
     </dependency>
 
diff --git a/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java
new file mode 100644
index 0000000..8241852
--- /dev/null
+++ b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java
@@ -0,0 +1,892 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.embedded;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import io.netty.bootstrap.EventLoopProvider;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.DefaultChannelPipeline;
+import io.netty.channel.EventLoop;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.RecyclableArrayList;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+/**
+ * Base class for {@link Channel} implementations that are used in an embedded fashion.
+ */
+public class Plc4xEmbeddedChannel extends AbstractChannel implements EventLoopProvider {
+
+    private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
+    private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
+
+    private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
+    private enum State { OPEN, ACTIVE, CLOSED }
+
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
+
+    private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
+    private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
+
+    private final Plc4xEmbeddedEventLoop loop = new Plc4xEmbeddedEventLoop();
+    private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            recordException(future);
+        }
+    };
+
+    private final ChannelMetadata metadata;
+    private final ChannelConfig config;
+
+    private Queue<Object> inboundMessages;
+    private Queue<Object> outboundMessages;
+    private Throwable lastException;
+    private State state;
+
+    /**
+     * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
+     */
+    public Plc4xEmbeddedChannel() {
+        this(EMPTY_HANDLERS);
+    }
+
+    /**
+     * Create a new instance with the specified ID and an empty pipeline.
+     *
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     */
+    public Plc4xEmbeddedChannel(ChannelId channelId) {
+        this(channelId, EMPTY_HANDLERS);
+    }
+
+    /**
+     * Create a new instance with the pipeline initialized with the specified handlers.
+     *
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(ChannelHandler... handlers) {
+        this(EmbeddedChannelId.INSTANCE, handlers);
+    }
+
+    /**
+     * Create a new instance with the pipeline initialized with the specified handlers.
+     *
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
+        this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
+    }
+
+    /**
+     * Create a new instance with the pipeline initialized with the specified handlers.
+     *
+     * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+     *                 constructor. If {@code false} the user will need to call {@link #register()}.
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
+        this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
+    }
+
+    /**
+     * Create a new instance with the channel ID set to the given ID and the pipeline
+     * initialized with the specified handlers.
+     *
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
+        this(channelId, false, handlers);
+    }
+
+    /**
+     * Create a new instance with the channel ID set to the given ID and the pipeline
+     * initialized with the specified handlers.
+     *
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
+        this(channelId, true, hasDisconnect, handlers);
+    }
+
+    /**
+     * Create a new instance with the channel ID set to the given ID and the pipeline
+     * initialized with the specified handlers.
+     *
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+     *                 constructor. If {@code false} the user will need to call {@link #register()}.
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
+                           ChannelHandler... handlers) {
+        this(null, channelId, register, hasDisconnect, handlers);
+    }
+
+    /**
+     * Create a new instance with the channel ID set to the given ID and the pipeline
+     * initialized with the specified handlers.
+     *
+     * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+     *                 constructor. If {@code false} the user will need to call {@link #register()}.
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
+                           final ChannelHandler... handlers) {
+        super(parent, channelId);
+        metadata = metadata(hasDisconnect);
+        config = new DefaultChannelConfig(this);
+        setup(register, handlers);
+    }
+
+    /**
+     * Create a new instance with the channel ID set to the given ID and the pipeline
+     * initialized with the specified handlers.
+     *
+     * @param channelId the {@link ChannelId} that will be used to identify this channel
+     * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+     *                      to {@link #close()}, false otherwise.
+     * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
+     * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+     */
+    public Plc4xEmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
+                           final ChannelHandler... handlers) {
+        super(null, channelId);
+        metadata = metadata(hasDisconnect);
+        this.config = ObjectUtil.checkNotNull(config, "config");
+        setup(true, handlers);
+    }
+
+    @Override
+    public EventLoop getEventLoop() {
+        return loop;
+    }
+
+    @Override
+    public boolean isRegistered() {
+        return false;
+    }
+
+    private static ChannelMetadata metadata(boolean hasDisconnect) {
+        return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
+    }
+
+    private void setup(boolean register, final ChannelHandler... handlers) {
+        ObjectUtil.checkNotNull(handlers, "handlers");
+        ChannelPipeline p = pipeline();
+        p.addLast(new ChannelInitializer<Channel>() {
+            @Override
+            protected void initChannel(Channel ch) throws Exception {
+                ChannelPipeline pipeline = ch.pipeline();
+                for (ChannelHandler h: handlers) {
+                    if (h == null) {
+                        break;
+                    }
+                    pipeline.addLast(h);
+                }
+            }
+        });
+        if (register) {
+            ChannelFuture future = loop.register(this);
+            assert future.isDone();
+        }
+    }
+
+    /**
+     * Register this {@code Channel} on its {@link EventLoop}.
+     */
+    public void register() throws Exception {
+        ChannelFuture future = loop.register(this);
+        assert future.isDone();
+        Throwable cause = future.cause();
+        if (cause != null) {
+            PlatformDependent.throwException(cause);
+        }
+    }
+
+    @Override
+    protected final DefaultChannelPipeline newChannelPipeline() {
+        return new EmbeddedChannelPipeline(this);
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return metadata;
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return config;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return state != State.CLOSED;
+    }
+
+    @Override
+    public boolean isActive() {
+        return state == State.ACTIVE;
+    }
+
+    /**
+     * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
+     */
+    public Queue<Object> inboundMessages() {
+        if (inboundMessages == null) {
+            inboundMessages = new ArrayDeque<Object>();
+        }
+        return inboundMessages;
+    }
+
+    /**
+     * @deprecated use {@link #inboundMessages()}
+     */
+    @Deprecated
+    public Queue<Object> lastInboundBuffer() {
+        return inboundMessages();
+    }
+
+    /**
+     * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
+     */
+    public Queue<Object> outboundMessages() {
+        if (outboundMessages == null) {
+            outboundMessages = new ArrayDeque<Object>();
+        }
+        return outboundMessages;
+    }
+
+    /**
+     * @deprecated use {@link #outboundMessages()}
+     */
+    @Deprecated
+    public Queue<Object> lastOutboundBuffer() {
+        return outboundMessages();
+    }
+
+    /**
+     * Return received data from this {@link Channel}
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T readInbound() {
+        T message = (T) poll(inboundMessages);
+        if (message != null) {
+            ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
+        }
+        return message;
+    }
+
+    /**
+     * Read data from the outbound. This may return {@code null} if nothing is readable.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T readOutbound() {
+        T message =  (T) poll(outboundMessages);
+        if (message != null) {
+            ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
+        }
+        return message;
+    }
+
+    /**
+     * Write messages to the inbound of this {@link Channel}.
+     *
+     * @param msgs the messages to be written
+     *
+     * @return {@code true} if the write operation did add something to the inbound buffer
+     */
+    public boolean writeInbound(Object... msgs) {
+        ensureOpen();
+        if (msgs.length == 0) {
+            return isNotEmpty(inboundMessages);
+        }
+
+        ChannelPipeline p = pipeline();
+        for (Object m: msgs) {
+            p.fireChannelRead(m);
+        }
+
+        flushInbound(false, voidPromise());
+        return isNotEmpty(inboundMessages);
+    }
+
+    /**
+     * Writes one message to the inbound of this {@link Channel} and does not flush it. This
+     * method is conceptually equivalent to {@link #write(Object)}.
+     *
+     * @see #writeOneOutbound(Object)
+     */
+    public ChannelFuture writeOneInbound(Object msg) {
+        return writeOneInbound(msg, newPromise());
+    }
+
+    /**
+     * Writes one message to the inbound of this {@link Channel} and does not flush it. This
+     * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
+     *
+     * @see #writeOneOutbound(Object, ChannelPromise)
+     */
+    public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
+        if (checkOpen(true)) {
+            pipeline().fireChannelRead(msg);
+        }
+        return checkException(promise);
+    }
+
+    /**
+     * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
+     *
+     * @see #flushOutbound()
+     */
+    public Plc4xEmbeddedChannel flushInbound() {
+        flushInbound(true, voidPromise());
+        return this;
+    }
+
+    private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
+        if (checkOpen(recordException)) {
+            pipeline().fireChannelReadComplete();
+            runPendingTasks();
+        }
+
+        return checkException(promise);
+    }
+
+    /**
+     * Write messages to the outbound of this {@link Channel}.
+     *
+     * @param msgs              the messages to be written
+     * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
+     */
+    public boolean writeOutbound(Object... msgs) {
+        ensureOpen();
+        if (msgs.length == 0) {
+            return isNotEmpty(outboundMessages);
+        }
+
+        RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
+        try {
+            for (Object m: msgs) {
+                if (m == null) {
+                    break;
+                }
+                futures.add(write(m));
+            }
+
+            flushOutbound0();
+
+            int size = futures.size();
+            for (int i = 0; i < size; i++) {
+                ChannelFuture future = (ChannelFuture) futures.get(i);
+                if (future.isDone()) {
+                    recordException(future);
+                } else {
+                    // The write may be delayed to run later by runPendingTasks()
+                    future.addListener(recordExceptionListener);
+                }
+            }
+
+            checkException();
+            return isNotEmpty(outboundMessages);
+        } finally {
+            futures.recycle();
+        }
+    }
+
+    /**
+     * Writes one message to the outbound of this {@link Channel} and does not flush it. This
+     * method is conceptually equivalent to {@link #write(Object)}.
+     *
+     * @see #writeOneInbound(Object)
+     */
+    public ChannelFuture writeOneOutbound(Object msg) {
+        return writeOneOutbound(msg, newPromise());
+    }
+
+    /**
+     * Writes one message to the outbound of this {@link Channel} and does not flush it. This
+     * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
+     *
+     * @see #writeOneInbound(Object, ChannelPromise)
+     */
+    public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
+        if (checkOpen(true)) {
+            return write(msg, promise);
+        }
+        return checkException(promise);
+    }
+
+    /**
+     * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
+     *
+     * @see #flushInbound()
+     */
+    public Plc4xEmbeddedChannel flushOutbound() {
+        if (checkOpen(true)) {
+            flushOutbound0();
+        }
+        checkException(voidPromise());
+        return this;
+    }
+
+    private void flushOutbound0() {
+        // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
+        // delay the write on the next eventloop run.
+        runPendingTasks();
+
+        flush();
+    }
+
+    /**
+     * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
+     *
+     * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+     */
+    public boolean finish() {
+        return finish(false);
+    }
+
+    /**
+     * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
+     * Any further try to write data to it will fail.
+     *
+     * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+     */
+    public boolean finishAndReleaseAll() {
+        return finish(true);
+    }
+
+    /**
+     * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
+     *
+     * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
+     * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+     */
+    private boolean finish(boolean releaseAll) {
+        close();
+        try {
+            checkException();
+            return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
+        } finally {
+            if (releaseAll) {
+                releaseAll(inboundMessages);
+                releaseAll(outboundMessages);
+            }
+        }
+    }
+
+    /**
+     * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
+     * otherwise.
+     */
+    public boolean releaseInbound() {
+        return releaseAll(inboundMessages);
+    }
+
+    /**
+     * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
+     * otherwise.
+     */
+    public boolean releaseOutbound() {
+        return releaseAll(outboundMessages);
+    }
+
+    private static boolean releaseAll(Queue<Object> queue) {
+        if (isNotEmpty(queue)) {
+            for (;;) {
+                Object msg = queue.poll();
+                if (msg == null) {
+                    break;
+                }
+                ReferenceCountUtil.release(msg);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private void finishPendingTasks(boolean cancel) {
+        runPendingTasks();
+        if (cancel) {
+            // Cancel all scheduled tasks that are left.
+            loop.cancelScheduledTasks();
+        }
+    }
+
+    @Override
+    public final ChannelFuture close() {
+        return close(newPromise());
+    }
+
+    @Override
+    public final ChannelFuture disconnect() {
+        return disconnect(newPromise());
+    }
+
+    @Override
+    public final ChannelFuture close(ChannelPromise promise) {
+        // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
+        // that needs to be run before the actual close takes place.
+        runPendingTasks();
+        ChannelFuture future = super.close(promise);
+
+        // Now finish everything else and cancel all scheduled tasks that were not ready set.
+        finishPendingTasks(true);
+        return future;
+    }
+
+    @Override
+    public final ChannelFuture disconnect(ChannelPromise promise) {
+        ChannelFuture future = super.disconnect(promise);
+        finishPendingTasks(!metadata.hasDisconnect());
+        return future;
+    }
+
+    private static boolean isNotEmpty(Queue<Object> queue) {
+        return queue != null && !queue.isEmpty();
+    }
+
+    private static Object poll(Queue<Object> queue) {
+        return queue != null ? queue.poll() : null;
+    }
+
+    /**
+     * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
+     * for this {@link Channel}
+     */
+    public void runPendingTasks() {
+        try {
+            loop.runTasks();
+        } catch (Exception e) {
+            recordException(e);
+        }
+
+        try {
+            loop.runScheduledTasks();
+        } catch (Exception e) {
+            recordException(e);
+        }
+    }
+
+    /**
+     * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
+     * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
+     * {@code -1}.
+     */
+    public long runScheduledPendingTasks() {
+        try {
+            return loop.runScheduledTasks();
+        } catch (Exception e) {
+            recordException(e);
+            return loop.nextScheduledTask();
+        }
+    }
+
+    private void recordException(ChannelFuture future) {
+        if (!future.isSuccess()) {
+            recordException(future.cause());
+        }
+    }
+
+    private void recordException(Throwable cause) {
+        if (lastException == null) {
+            lastException = cause;
+        } else {
+            logger.warn(
+                "More than one exception was raised. " +
+                    "Will report only the first one and log others.", cause);
+        }
+    }
+
+    /**
+     * Checks for the presence of an {@link Exception}.
+     */
+    private ChannelFuture checkException(ChannelPromise promise) {
+        Throwable t = lastException;
+        if (t != null) {
+            lastException = null;
+
+            if (promise.isVoid()) {
+                PlatformDependent.throwException(t);
+            }
+
+            return promise.setFailure(t);
+        }
+
+        return promise.setSuccess();
+    }
+
+    /**
+     * Check if there was any {@link Throwable} received and if so rethrow it.
+     */
+    public void checkException() {
+        checkException(voidPromise());
+    }
+
+    /**
+     * Returns {@code true} if the {@link Channel} is open and records optionally
+     * an {@link Exception} if it isn't.
+     */
+    private boolean checkOpen(boolean recordException) {
+        if (!isOpen()) {
+            if (recordException) {
+                recordException(new ClosedChannelException());
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Ensure the {@link Channel} is open and if not throw an exception.
+     */
+    protected final void ensureOpen() {
+        if (!checkOpen(true)) {
+            checkException();
+        }
+    }
+
+    @Override
+    protected boolean isCompatible(EventLoop loop) {
+        return loop instanceof Plc4xEmbeddedEventLoop;
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return isActive()? LOCAL_ADDRESS : null;
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return isActive()? REMOTE_ADDRESS : null;
+    }
+
+    @Override
+    protected void doRegister() throws Exception {
+        state = State.ACTIVE;
+    }
+
+    @Override
+    protected void doBind(SocketAddress localAddress) throws Exception {
+        // NOOP
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+        if (!metadata.hasDisconnect()) {
+            doClose();
+        }
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        state = State.CLOSED;
+    }
+
+    @Override
+    protected void doBeginRead() throws Exception {
+        // NOOP
+    }
+
+    @Override
+    protected AbstractUnsafe newUnsafe() {
+        return new EmbeddedUnsafe();
+    }
+
+    @Override
+    public Unsafe unsafe() {
+        return ((EmbeddedUnsafe) super.unsafe()).wrapped;
+    }
+
+    @Override
+    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+        for (;;) {
+            Object msg = in.current();
+            if (msg == null) {
+                break;
+            }
+
+            ReferenceCountUtil.retain(msg);
+            handleOutboundMessage(msg);
+            in.remove();
+        }
+    }
+
+    /**
+     * Called for each outbound message.
+     *
+     * @see #doWrite(ChannelOutboundBuffer)
+     */
+    protected void handleOutboundMessage(Object msg) {
+        outboundMessages().add(msg);
+    }
+
+    /**
+     * Called for each inbound message.
+     */
+    protected void handleInboundMessage(Object msg) {
+        inboundMessages().add(msg);
+    }
+
+    private final class EmbeddedUnsafe extends AbstractUnsafe {
+
+        // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
+        // that may change the state of the Channel and may schedule tasks for later execution.
+        final Unsafe wrapped = new Unsafe() {
+            @Override
+            public RecvByteBufAllocator.Handle recvBufAllocHandle() {
+                return EmbeddedUnsafe.this.recvBufAllocHandle();
+            }
+
+            @Override
+            public SocketAddress localAddress() {
+                return EmbeddedUnsafe.this.localAddress();
+            }
+
+            @Override
+            public SocketAddress remoteAddress() {
+                return EmbeddedUnsafe.this.remoteAddress();
+            }
+
+            @Override
+            public void register(EventLoop eventLoop, ChannelPromise promise) {
+                EmbeddedUnsafe.this.register(eventLoop, promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void bind(SocketAddress localAddress, ChannelPromise promise) {
+                EmbeddedUnsafe.this.bind(localAddress, promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+                EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void disconnect(ChannelPromise promise) {
+                EmbeddedUnsafe.this.disconnect(promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void close(ChannelPromise promise) {
+                EmbeddedUnsafe.this.close(promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void closeForcibly() {
+                EmbeddedUnsafe.this.closeForcibly();
+                runPendingTasks();
+            }
+
+            @Override
+            public void deregister(ChannelPromise promise) {
+                EmbeddedUnsafe.this.deregister(promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void beginRead() {
+                EmbeddedUnsafe.this.beginRead();
+                runPendingTasks();
+            }
+
+            @Override
+            public void write(Object msg, ChannelPromise promise) {
+                EmbeddedUnsafe.this.write(msg, promise);
+                runPendingTasks();
+            }
+
+            @Override
+            public void flush() {
+                EmbeddedUnsafe.this.flush();
+                runPendingTasks();
+            }
+
+            @Override
+            public ChannelPromise voidPromise() {
+                return EmbeddedUnsafe.this.voidPromise();
+            }
+
+            @Override
+            public ChannelOutboundBuffer outboundBuffer() {
+                return EmbeddedUnsafe.this.outboundBuffer();
+            }
+        };
+
+        @Override
+        public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+            safeSetSuccess(promise);
+        }
+    }
+
+    private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
+        EmbeddedChannelPipeline(Plc4xEmbeddedChannel channel) {
+            super(channel);
+        }
+
+        @Override
+        protected void onUnhandledInboundException(Throwable cause) {
+            recordException(cause);
+        }
+
+        @Override
+        protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
+            handleInboundMessage(msg);
+        }
+    }
+}
diff --git a/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java
new file mode 100644
index 0000000..d1aa600
--- /dev/null
+++ b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.embedded;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.*;
+import io.netty.util.internal.ObjectUtil;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+final class Plc4xEmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
+
+    private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
+    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
+
+    @Override
+    public EventLoopGroup parent() {
+        return (EventLoopGroup) super.parent();
+    }
+
+    @Override
+    public EventLoop next() {
+        return (EventLoop) super.next();
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        tasks.add(ObjectUtil.checkNotNull(command, "command"));
+    }
+
+    void runTasks() {
+        for (;;) {
+            Runnable task = tasks.poll();
+            if (task == null) {
+                break;
+            }
+
+            task.run();
+        }
+    }
+
+    long runScheduledTasks() {
+        long time = AbstractScheduledEventExecutor.nanoTime();
+        for (;;) {
+            Runnable task = pollScheduledTask(time);
+            if (task == null) {
+                return nextScheduledTaskNano();
+            }
+
+            task.run();
+        }
+    }
+
+    long nextScheduledTask() {
+        return nextScheduledTaskNano();
+    }
+
+    @Override
+    protected void cancelScheduledTasks() {
+        super.cancelScheduledTasks();
+    }
+
+    @Override
+    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Future<?> terminationFuture() {
+        return terminationFuture;
+    }
+
+    @Override
+    @Deprecated
+    public void shutdown() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isShuttingDown() {
+        return false;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) {
+        return false;
+    }
+
+    @Override
+    public ChannelFuture register(Channel channel) {
+        return register(new DefaultChannelPromise(channel, this));
+    }
+
+    @Override
+    public ChannelFuture register(ChannelPromise promise) {
+        ObjectUtil.checkNotNull(promise, "promise");
+        promise.channel().unsafe().register(this, promise);
+        return promise;
+    }
+
+    @Deprecated
+    @Override
+    public ChannelFuture register(Channel channel, ChannelPromise promise) {
+        channel.unsafe().register(this, promise);
+        return promise;
+    }
+
+    @Override
+    public boolean inEventLoop() {
+        return true;
+    }
+
+    @Override
+    public boolean inEventLoop(Thread thread) {
+        return true;
+    }
+}
diff --git a/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java b/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
index 084ed7d..e7b206f 100644
--- a/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
+++ b/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
@@ -20,7 +20,10 @@ package org.apache.plc4x.java.transport.test;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.embedded.Plc4xEmbeddedChannel;
 import org.apache.plc4x.java.spi.configuration.HasConfiguration;
 import org.apache.plc4x.java.spi.connection.NettyChannelFactory;
 import org.slf4j.Logger;
@@ -45,11 +48,17 @@ public class TestChannelFactory extends NettyChannelFactory implements HasConfig
 
     @Override
     public Class<? extends Channel> getChannel() {
-        return EmbeddedChannel.class;
+        return Plc4xEmbeddedChannel.class;
+    }
+
+    @Override
+    public EventLoopGroup getEventLoopGroup() {
+        return null;
     }
 
     @Override
     public void configureBootstrap(Bootstrap bootstrap) {
+        bootstrap.localAddress(new TestSocketAddress("lalala"));
         if(configuration != null) {
             logger.info("Configuring Bootstrap with {}", configuration);
         }
diff --git a/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
deleted file mode 100644
index ebff4aa..0000000
--- a/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.plc4x.java.mock.PlcMockDriver
diff --git a/plc4j/utils/test-utils/pom.xml b/plc4j/utils/test-utils/pom.xml
index c2e4843..9eb3cea 100644
--- a/plc4j/utils/test-utils/pom.xml
+++ b/plc4j/utils/test-utils/pom.xml
@@ -32,12 +32,44 @@
   <name>PLC4J: Utils: Test Utils</name>
   <description>A set of test utils. Especially defining the test-categories used to categorize tests.</description>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <usedDependencies combine.children="append">
+          </usedDependencies>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-api</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-spi</artifactId>
       <version>0.7.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-transport-test</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>commons-codec</groupId>
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
index c022949..5468ba1 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
@@ -18,10 +18,21 @@ under the License.
 */
 package org.apache.plc4x.test.driver;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import org.apache.plc4x.java.spi.generation.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.embedded.Plc4xEmbeddedChannel;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.spi.connection.ChannelExposingConnection;
+import org.apache.plc4x.java.spi.connection.GeneratedDriverBase;
+import org.apache.plc4x.java.spi.generation.*;
 import org.apache.plc4x.test.driver.exceptions.DriverTestsuiteException;
 import org.apache.plc4x.test.driver.model.DriverTestsuite;
+import org.apache.plc4x.test.driver.model.StepType;
 import org.apache.plc4x.test.driver.model.TestStep;
 import org.apache.plc4x.test.driver.model.Testcase;
 import org.dom4j.Document;
@@ -33,11 +44,13 @@ import org.junit.jupiter.api.DynamicTest;
 import org.junit.jupiter.api.TestFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xmlunit.builder.DiffBuilder;
+import org.xmlunit.diff.Diff;
 
-import javax.xml.stream.XMLStreamException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 public class DriverTestsuiteRunner {
 
@@ -86,13 +99,17 @@ public class DriverTestsuiteRunner {
             List<TestStep> setupSteps = new LinkedList<>();
             if(testsuiteXml.element(new QName("setup")) != null) {
                 Element setupElement = testsuiteXml.element(new QName("setup"));
-                setupSteps.add(new TestStep(setupElement));
+                for (Element element : setupElement.elements()) {
+                    setupSteps.add(parseTestStep(element));
+                }
             }
 
             List<TestStep> teardownSteps = new LinkedList<>();
             if(testsuiteXml.element(new QName("teardown")) != null) {
                 Element teardownElement = testsuiteXml.element(new QName("teardown"));
-                setupSteps.add(new TestStep(teardownElement));
+                for (Element element : teardownElement.elements()) {
+                    setupSteps.add(parseTestStep(element));
+                }
             }
 
             List<Element> testcasesXml = testsuiteXml.elements(new QName("testcase"));
@@ -107,28 +124,181 @@ public class DriverTestsuiteRunner {
 
                 List<TestStep> steps = new LinkedList<>();
                 for (Element element : stepsElement.elements()) {
-                    steps.add(new TestStep(element));
+                    steps.add(parseTestStep(element));
                 }
                 testcases.add(new Testcase(name, description, steps));
             }
             LOGGER.info(String.format("Found %d testcases.", testcases.size()));
-            return new DriverTestsuite(
-                testsuiteName, driverName, driverParameters, setupSteps, teardownSteps, testcases);
+
+            // Force the driver to not wait for the connection before returning the connection.
+            System.setProperty(GeneratedDriverBase.PROPERTY_PLC4X_FORCE_AWAIT_SETUP_COMPLETE, "false");
+
+            PlcConnection connection = getConnection(driverName, driverParameters);
+
+            TimeUnit.MILLISECONDS.sleep(200);
+
+            return new DriverTestsuite(testsuiteName, connection, setupSteps, teardownSteps, testcases);
         } catch (DocumentException e) {
             throw new DriverTestsuiteException("Error parsing testsuite xml", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DriverTestsuiteException("Interruption setting up testsuite xml", e);
+        }
+    }
+
+    private PlcConnection getConnection(String driverName, Map<String, String> driverParameters)
+        throws DriverTestsuiteException {
+        try {
+            StringBuilder sb = new StringBuilder();
+            if(driverParameters != null) {
+                for (Map.Entry<String, String> parameter : driverParameters.entrySet()) {
+                    sb.append("&").append(parameter.getKey()).append("=").append(parameter.getValue());
+                }
+            }
+            if(sb.length() > 0) {
+                sb.replace(0, 1, "?");
+            }
+            return new PlcDriverManager().getConnection(driverName + ":test://hurz" + sb.toString());
+        } catch (PlcConnectionException e) {
+            throw new DriverTestsuiteException("Error loading driver", e);
         }
     }
 
     private void run(DriverTestsuite testSuite, Testcase testcase) throws DriverTestsuiteException {
-        XmlMapper xmlMapper = new XmlMapper();
+        final Plc4xEmbeddedChannel embeddedChannel = getEmbeddedChannel(testSuite);
+        if(!testSuite.getSetupSteps().isEmpty()) {
+            LOGGER.info("Running setup steps");
+            for (TestStep setupStep : testSuite.getSetupSteps()) {
+                executeStep(setupStep, embeddedChannel);
+            }
+            LOGGER.info("Finished setup steps");
+        }
+        LOGGER.info("Running test steps");
         for (TestStep step : testcase.getSteps()) {
-            String referenceXml = step.getPayload().asXML();
-            try {
-                final Message message = xmlMapper.readValue(referenceXml, Message.class);
-                System.out.println(message);
-            } catch (IOException e) {
-                e.printStackTrace();
+            executeStep(step, embeddedChannel);
+        }
+        LOGGER.info("Finished test steps");
+        if(!testSuite.getTeardownSteps().isEmpty()) {
+            LOGGER.info("Running teardown steps");
+            for (TestStep teardownStep : testSuite.getTeardownSteps()) {
+                executeStep(teardownStep, embeddedChannel);
+            }
+            LOGGER.info("Finished teardown steps");
+        }
+    }
+
+    private void executeStep(TestStep testStep, Plc4xEmbeddedChannel embeddedChannel) throws DriverTestsuiteException {
+        LOGGER.info("  - Running step " + testStep.getType());
+        ObjectMapper mapper = new XmlMapper().enableDefaultTyping();
+        final Element payload = testStep.getPayload();
+        final String className = payload.attributeValue(new QName("className"));
+        String referenceXml = payload.asXML();
+        try {
+            final MessageIO messageIO = getMessageIOType(className).newInstance();
+            switch (testStep.getType()) {
+                case SEND_PLC_BYTES: {
+                    break;
+                }
+                case SEND_PLC_MESSAGE: {
+                    final ByteBuf byteBuf = embeddedChannel.readOutbound();
+                    byte[] data = new byte[byteBuf.readableBytes()];
+                    byteBuf.readBytes(data);
+                    ReadBuffer readBuffer = new ReadBuffer(data);
+                    try {
+                        final Object parsedOutput = messageIO.parse(readBuffer);
+                        String xmlString = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(parsedOutput);
+                        Diff diff = DiffBuilder.compare(referenceXml).withTest(xmlString).ignoreComments().ignoreWhitespace().build();
+                        if (diff.hasDifferences()) {
+                            System.out.println(xmlString);
+                            throw new DriverTestsuiteException("Differences were found after parsing.\n" + diff.toString());
+                        }
+                    } catch (ParseException e) {
+                        throw new DriverTestsuiteException("Error parsing message", e);
+                    }
+                    break;
+                }
+                case RECEIVE_PLC_BYTES: {
+                    break;
+                }
+                case RECEIVE_PLC_MESSAGE: {
+                    final Message message = mapper.readValue(referenceXml, getMessageType(className));
+                    WriteBuffer writeBuffer = new WriteBuffer(1024);
+                    try {
+                        messageIO.serialize(writeBuffer, message);
+                        byte[] data = new byte[writeBuffer.getPos()];
+                        System.arraycopy(writeBuffer.getData(), 0, data, 0, writeBuffer.getPos());
+                        embeddedChannel.writeOutbound(data);
+                    } catch (ParseException e) {
+                        throw new DriverTestsuiteException("Error serializing message", e);
+                    }
+                    break;
+                }
+                case API_REQUEST: {
+                    break;
+                }
+                case API_RESPONSE: {
+                    break;
+                }
+                case DELAY: {
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(200);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new DriverTestsuiteException("Interrupted during delay.");
+                    }
+                }
+
+            }
+        } catch (IOException e) {
+            throw new DriverTestsuiteException("Error processing the xml", e);
+        } catch (IllegalAccessException | InstantiationException e) {
+            throw new DriverTestsuiteException("Error instantiating MessageIO class", e);
+        }
+        LOGGER.info("    Done");
+    }
+
+    private TestStep parseTestStep(Element curElement) {
+        final String elementName = curElement.getName();
+        final StepType stepType = StepType.valueOf(elementName.toUpperCase().replace("-", "_"));
+        final Element definition = curElement.elementIterator().next();
+        return new TestStep(stepType, definition);
+    }
+
+    private Plc4xEmbeddedChannel getEmbeddedChannel(DriverTestsuite testSuite) {
+        if(testSuite.getConnection() instanceof ChannelExposingConnection) {
+            ChannelExposingConnection connection = (ChannelExposingConnection) testSuite.getConnection();
+            Channel channel = connection.getChannel();
+            if(channel instanceof Plc4xEmbeddedChannel) {
+                return (Plc4xEmbeddedChannel) channel;
+            }
+            throw new PlcRuntimeException("Expecting EmbeddedChannel");
+        }
+        throw new PlcRuntimeException("Expecting ChannelExposingConnection");
+    }
+
+    private Class<? extends Message> getMessageType(String messageClassName) throws DriverTestsuiteException {
+        try {
+            final Class<?> messageClass = Class.forName(messageClassName);
+            if(Message.class.isAssignableFrom(messageClass)) {
+                return (Class<? extends Message>) messageClass;
+            }
+            throw new DriverTestsuiteException("IO class muss implement Message interface");
+        } catch (ClassNotFoundException e) {
+            throw new DriverTestsuiteException("Error loading message class", e);
+        }
+    }
+
+    private Class<? extends MessageIO> getMessageIOType(String messageClassName) throws DriverTestsuiteException {
+        String ioClassName = messageClassName.substring(0, messageClassName.lastIndexOf('.')) + ".io." +
+            messageClassName.substring(messageClassName.lastIndexOf('.') + 1) + "IO";
+        try {
+            final Class<?> ioClass = Class.forName(ioClassName);
+            if(MessageIO.class.isAssignableFrom(ioClass)) {
+                return (Class<? extends MessageIO>) ioClass;
             }
+            throw new DriverTestsuiteException("IO class muss implement MessageIO interface");
+        } catch (ClassNotFoundException e) {
+            throw new DriverTestsuiteException("Error loading io class", e);
         }
     }
 
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
index 6b12554..8ee9dab 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
@@ -18,22 +18,21 @@ under the License.
 */
 package org.apache.plc4x.test.driver.model;
 
+import org.apache.plc4x.java.api.PlcConnection;
+
 import java.util.List;
-import java.util.Map;
 
 public class DriverTestsuite {
 
     private final String name;
-    private final String driverName;
-    private final Map<String, String> parameters;
+    private final PlcConnection connection;
     private final List<TestStep> setupSteps;
     private final List<TestStep> teardownSteps;
     private final List<Testcase> testcases;
 
-    public DriverTestsuite(String name, String driverName, Map<String, String> parameters, List<TestStep> setupSteps, List<TestStep> teardownSteps, List<Testcase> testcases) {
+    public DriverTestsuite(String name, PlcConnection connection, List<TestStep> setupSteps, List<TestStep> teardownSteps, List<Testcase> testcases) {
         this.name = name;
-        this.driverName = driverName;
-        this.parameters = parameters;
+        this.connection = connection;
         this.setupSteps = setupSteps;
         this.teardownSteps = teardownSteps;
         this.testcases = testcases;
@@ -43,12 +42,8 @@ public class DriverTestsuite {
         return name;
     }
 
-    public String getDriverName() {
-        return driverName;
-    }
-
-    public Map<String, String> getParameters() {
-        return parameters;
+    public PlcConnection getConnection() {
+        return connection;
     }
 
     public List<TestStep> getSetupSteps() {
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
similarity index 59%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
index 92e281c..93d9f3f 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
@@ -18,18 +18,24 @@ under the License.
 */
 package org.apache.plc4x.test.driver.model;
 
-import org.dom4j.Element;
-
-public class TestStep {
-
-    private final Element payload;
-
-    public TestStep(Element payload) {
-        this.payload = payload;
-    }
-
-    public Element getPayload() {
-        return payload;
-    }
+public enum StepType {
+
+    /*
+                                PLC --------- PLC4X --------- Application
+        SEND_PLC_MESSAGE:           <--------
+        SEND_PLC_BYTES:             <--------
+        RECEIVE_PLC_MESSAGE:        -------->
+        RECEIVE_PLC_BYTES:          -------->
+        API_REQUEST:                                <--------
+        API_RESPONSE:                               -------->
+     */
+
+    SEND_PLC_MESSAGE,
+    SEND_PLC_BYTES,
+    RECEIVE_PLC_MESSAGE,
+    RECEIVE_PLC_BYTES,
+    API_REQUEST,
+    API_RESPONSE,
+    DELAY
 
 }
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
index 92e281c..1e9e37d 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
@@ -22,12 +22,18 @@ import org.dom4j.Element;
 
 public class TestStep {
 
+    private final StepType type;
     private final Element payload;
 
-    public TestStep(Element payload) {
+    public TestStep(StepType type, Element payload) {
+        this.type = type;
         this.payload = payload;
     }
 
+    public StepType getType() {
+        return type;
+    }
+
     public Element getPayload() {
         return payload;
     }
diff --git a/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd b/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
index 3ea8442..a97399d 100644
--- a/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
+++ b/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
@@ -49,10 +49,13 @@
                     <xs:complexType>
                         <xs:sequence minOccurs="1" maxOccurs="unbounded">
                             <xs:choice>
-                                <xs:element name="send-bytes" type="xs:anyType"/>
-                                <xs:element name="receive-bytes" type="xs:anyType"/>
-                                <xs:element name="plc4x-request" type="xs:anyType"/>
-                                <xs:element name="plc4x-response" type="xs:anyType"/>
+                                <xs:element name="send-plc-message" type="xs:anyType"/>
+                                <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+                                <xs:element name="receive-plc-message" type="xs:anyType"/>
+                                <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+                                <xs:element name="api-request" type="xs:anyType"/>
+                                <xs:element name="api-response" type="xs:anyType"/>
+                                <xs:element name="delay" type="xs:integer"/>
                             </xs:choice>
                         </xs:sequence>
                     </xs:complexType>
@@ -63,10 +66,13 @@
                     <xs:complexType>
                         <xs:sequence minOccurs="1" maxOccurs="unbounded">
                             <xs:choice>
-                                <xs:element name="send-bytes" type="xs:anyType"/>
-                                <xs:element name="receive-bytes" type="xs:anyType"/>
-                                <xs:element name="plc4x-request" type="xs:anyType"/>
-                                <xs:element name="plc4x-response" type="xs:anyType"/>
+                                <xs:element name="send-plc-message" type="xs:anyType"/>
+                                <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+                                <xs:element name="receive-plc-message" type="xs:anyType"/>
+                                <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+                                <xs:element name="api-request" type="xs:anyType"/>
+                                <xs:element name="api-response" type="xs:anyType"/>
+                                <xs:element name="delay" type="xs:integer"/>
                             </xs:choice>
                         </xs:sequence>
                     </xs:complexType>
@@ -84,10 +90,13 @@
                                 <xs:complexType>
                                     <xs:sequence minOccurs="1" maxOccurs="unbounded">
                                         <xs:choice>
-                                            <xs:element name="send-bytes" type="xs:anyType"/>
-                                            <xs:element name="receive-bytes" type="xs:anyType"/>
-                                            <xs:element name="plc4x-request" type="xs:anyType"/>
-                                            <xs:element name="plc4x-response" type="xs:anyType"/>
+                                            <xs:element name="send-plc-message" type="xs:anyType"/>
+                                            <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+                                            <xs:element name="receive-plc-message" type="xs:anyType"/>
+                                            <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+                                            <xs:element name="api-request" type="xs:anyType"/>
+                                            <xs:element name="api-response" type="xs:anyType"/>
+                                            <xs:element name="delay" type="xs:integer"/>
                                         </xs:choice>
                                     </xs:sequence>
                                 </xs:complexType>