You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/02/25 22:37:29 UTC
[plc4x] 04/04: - Worked on getting the test-channel to actually
work - Implemented a first somewhat working version of the driver-testsuite
execution code
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/driver-testsuite
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 4b651798775ee36ae38194b8e1e4615b14c21a02
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Feb 25 23:37:14 2020 +0100
- Worked on getting the test-channel to actually work
- Implemented a first somewhat working version of the driver-testsuite execution code
---
.../io/netty/bootstrap/EventLoopProvider.java} | 16 +-
.../java/io/netty/bootstrap/Plc4xBootstrap.java | 66 ++
.../spi/connection/ChannelExposingConnection.java} | 16 +-
.../spi/connection/DefaultNettyPlcConnection.java | 3 +-
.../java/spi/connection/NettyChannelFactory.java | 10 +-
plc4j/transports/test/pom.xml | 4 +
.../channel/embedded/Plc4xEmbeddedChannel.java | 892 +++++++++++++++++++++
.../channel/embedded/Plc4xEmbeddedEventLoop.java | 147 ++++
.../java/transport/test/TestChannelFactory.java | 11 +-
.../services/org.apache.plc4x.java.api.PlcDriver | 19 -
plc4j/utils/test-utils/pom.xml | 32 +
.../plc4x/test/driver/DriverTestsuiteRunner.java | 198 ++++-
.../plc4x/test/driver/model/DriverTestsuite.java | 19 +-
.../driver/model/{TestStep.java => StepType.java} | 32 +-
.../apache/plc4x/test/driver/model/TestStep.java | 8 +-
.../main/resources/schemas/driver-testsuite.xsd | 33 +-
16 files changed, 1405 insertions(+), 101 deletions(-)
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
similarity index 73%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
index 92e281c..1e2877b 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/spi/src/main/java/io/netty/bootstrap/EventLoopProvider.java
@@ -16,20 +16,12 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.test.driver.model;
+package io.netty.bootstrap;
-import org.dom4j.Element;
+import io.netty.channel.EventLoop;
-public class TestStep {
+public interface EventLoopProvider {
- private final Element payload;
-
- public TestStep(Element payload) {
- this.payload = payload;
- }
-
- public Element getPayload() {
- return payload;
- }
+ EventLoop getEventLoop();
}
diff --git a/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java b/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java
new file mode 100644
index 0000000..133f2cb
--- /dev/null
+++ b/plc4j/spi/src/main/java/io/netty/bootstrap/Plc4xBootstrap.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package io.netty.bootstrap;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+
+import java.net.SocketAddress;
+
+public class Plc4xBootstrap extends Bootstrap {
+
+ @Override
+ public Bootstrap validate() {
+ if(channelFactory() != null) {
+ if(channelFactory().toString().contains("EmbeddedChannel")) {
+ if (config().handler() == null) {
+ throw new IllegalStateException("handler not set");
+ }
+ } else {
+ return super.validate();
+ }
+ } else {
+ return super.validate();
+ }
+ return this;
+ }
+
+ @Override
+ void init(Channel channel) throws Exception {
+ if((group == null) && (channel instanceof EventLoopProvider)) {
+ group = ((EventLoopProvider) channel).getEventLoop();
+ }
+ super.init(channel);
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ final ChannelFuture connectFuture = super.connect(remoteAddress);
+
+ if("Plc4xEmbeddedEventLoop".equals(group.getClass().getSimpleName())) {
+ if (connectFuture instanceof ChannelPromise) {
+ ChannelPromise connectPromise = (ChannelPromise) connectFuture;
+ connectPromise.setSuccess();
+ }
+ }
+ return connectFuture;
+ }
+
+}
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
similarity index 73%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
index 92e281c..a9517c2 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelExposingConnection.java
@@ -16,20 +16,12 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.test.driver.model;
+package org.apache.plc4x.java.spi.connection;
-import org.dom4j.Element;
+import io.netty.channel.Channel;
-public class TestStep {
+public interface ChannelExposingConnection {
- private final Element payload;
-
- public TestStep(Element payload) {
- this.payload = payload;
- }
-
- public Element getPayload() {
- return payload;
- }
+ Channel getChannel();
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index 05e1b70..855bae9 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-public class DefaultNettyPlcConnection extends AbstractPlcConnection {
+public class DefaultNettyPlcConnection extends AbstractPlcConnection implements ChannelExposingConnection {
/**
* a {@link HashedWheelTimer} shall be only instantiated once.
@@ -145,6 +145,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection {
return connected && channel.isActive();
}
+ @Override
public Channel getChannel() {
return channel;
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
index 2d227c1..8bfc55a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
@@ -20,14 +20,13 @@
package org.apache.plc4x.java.spi.connection;
import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.Plc4xBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.spi.configuration.ConfigurationFactory;
-import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,10 +85,13 @@ public abstract class NettyChannelFactory implements ChannelFactory {
@Override
public Channel createChannel(ChannelHandler channelHandler) throws PlcConnectionException {
try {
+ Bootstrap bootstrap = new Plc4xBootstrap();
+
final EventLoopGroup workerGroup = getEventLoopGroup();
+ if(workerGroup != null) {
+ bootstrap.group(workerGroup);
+ }
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(workerGroup);
bootstrap.channel(getChannel());
// Callback to allow subclasses to modify the Bootstrap
diff --git a/plc4j/transports/test/pom.xml b/plc4j/transports/test/pom.xml
index cac99e6..0b3b767 100644
--- a/plc4j/transports/test/pom.xml
+++ b/plc4j/transports/test/pom.xml
@@ -40,6 +40,10 @@
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
diff --git a/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java
new file mode 100644
index 0000000..8241852
--- /dev/null
+++ b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedChannel.java
@@ -0,0 +1,892 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.embedded;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import io.netty.bootstrap.EventLoopProvider;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.DefaultChannelPipeline;
+import io.netty.channel.EventLoop;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.RecyclableArrayList;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+/**
+ * Base class for {@link Channel} implementations that are used in an embedded fashion.
+ */
+public class Plc4xEmbeddedChannel extends AbstractChannel implements EventLoopProvider {
+
+ private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
+ private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
+
+ private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
+ private enum State { OPEN, ACTIVE, CLOSED }
+
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
+
+ private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
+ private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
+
+ private final Plc4xEmbeddedEventLoop loop = new Plc4xEmbeddedEventLoop();
+ private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ recordException(future);
+ }
+ };
+
+ private final ChannelMetadata metadata;
+ private final ChannelConfig config;
+
+ private Queue<Object> inboundMessages;
+ private Queue<Object> outboundMessages;
+ private Throwable lastException;
+ private State state;
+
+ /**
+ * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
+ */
+ public Plc4xEmbeddedChannel() {
+ this(EMPTY_HANDLERS);
+ }
+
+ /**
+ * Create a new instance with the specified ID and an empty pipeline.
+ *
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ */
+ public Plc4xEmbeddedChannel(ChannelId channelId) {
+ this(channelId, EMPTY_HANDLERS);
+ }
+
+ /**
+ * Create a new instance with the pipeline initialized with the specified handlers.
+ *
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(ChannelHandler... handlers) {
+ this(EmbeddedChannelId.INSTANCE, handlers);
+ }
+
+ /**
+ * Create a new instance with the pipeline initialized with the specified handlers.
+ *
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
+ this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
+ }
+
+ /**
+ * Create a new instance with the pipeline initialized with the specified handlers.
+ *
+ * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+ * constructor. If {@code false} the user will need to call {@link #register()}.
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
+ this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
+ }
+
+ /**
+ * Create a new instance with the channel ID set to the given ID and the pipeline
+ * initialized with the specified handlers.
+ *
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
+ this(channelId, false, handlers);
+ }
+
+ /**
+ * Create a new instance with the channel ID set to the given ID and the pipeline
+ * initialized with the specified handlers.
+ *
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
+ this(channelId, true, hasDisconnect, handlers);
+ }
+
+ /**
+ * Create a new instance with the channel ID set to the given ID and the pipeline
+ * initialized with the specified handlers.
+ *
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+ * constructor. If {@code false} the user will need to call {@link #register()}.
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
+ ChannelHandler... handlers) {
+ this(null, channelId, register, hasDisconnect, handlers);
+ }
+
+ /**
+ * Create a new instance with the channel ID set to the given ID and the pipeline
+ * initialized with the specified handlers.
+ *
+ * @param parent the parent {@link Channel} of this {@link EmbeddedChannel}.
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
+ * constructor. If {@code false} the user will need to call {@link #register()}.
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
+ final ChannelHandler... handlers) {
+ super(parent, channelId);
+ metadata = metadata(hasDisconnect);
+ config = new DefaultChannelConfig(this);
+ setup(register, handlers);
+ }
+
+ /**
+ * Create a new instance with the channel ID set to the given ID and the pipeline
+ * initialized with the specified handlers.
+ *
+ * @param channelId the {@link ChannelId} that will be used to identify this channel
+ * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
+ * to {@link #close()}, false otherwise.
+ * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
+ * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
+ */
+ public Plc4xEmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
+ final ChannelHandler... handlers) {
+ super(null, channelId);
+ metadata = metadata(hasDisconnect);
+ this.config = ObjectUtil.checkNotNull(config, "config");
+ setup(true, handlers);
+ }
+
+ @Override
+ public EventLoop getEventLoop() {
+ return loop;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ private static ChannelMetadata metadata(boolean hasDisconnect) {
+ return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
+ }
+
+ private void setup(boolean register, final ChannelHandler... handlers) {
+ ObjectUtil.checkNotNull(handlers, "handlers");
+ ChannelPipeline p = pipeline();
+ p.addLast(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ for (ChannelHandler h: handlers) {
+ if (h == null) {
+ break;
+ }
+ pipeline.addLast(h);
+ }
+ }
+ });
+ if (register) {
+ ChannelFuture future = loop.register(this);
+ assert future.isDone();
+ }
+ }
+
+ /**
+ * Register this {@code Channel} on its {@link EventLoop}.
+ */
+ public void register() throws Exception {
+ ChannelFuture future = loop.register(this);
+ assert future.isDone();
+ Throwable cause = future.cause();
+ if (cause != null) {
+ PlatformDependent.throwException(cause);
+ }
+ }
+
+ @Override
+ protected final DefaultChannelPipeline newChannelPipeline() {
+ return new EmbeddedChannelPipeline(this);
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return metadata;
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return config;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return state != State.CLOSED;
+ }
+
+ @Override
+ public boolean isActive() {
+ return state == State.ACTIVE;
+ }
+
+ /**
+ * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
+ */
+ public Queue<Object> inboundMessages() {
+ if (inboundMessages == null) {
+ inboundMessages = new ArrayDeque<Object>();
+ }
+ return inboundMessages;
+ }
+
+ /**
+ * @deprecated use {@link #inboundMessages()}
+ */
+ @Deprecated
+ public Queue<Object> lastInboundBuffer() {
+ return inboundMessages();
+ }
+
+ /**
+ * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
+ */
+ public Queue<Object> outboundMessages() {
+ if (outboundMessages == null) {
+ outboundMessages = new ArrayDeque<Object>();
+ }
+ return outboundMessages;
+ }
+
+ /**
+ * @deprecated use {@link #outboundMessages()}
+ */
+ @Deprecated
+ public Queue<Object> lastOutboundBuffer() {
+ return outboundMessages();
+ }
+
+ /**
+ * Return received data from this {@link Channel}
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T readInbound() {
+ T message = (T) poll(inboundMessages);
+ if (message != null) {
+ ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
+ }
+ return message;
+ }
+
+ /**
+ * Read data from the outbound. This may return {@code null} if nothing is readable.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T readOutbound() {
+ T message = (T) poll(outboundMessages);
+ if (message != null) {
+ ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
+ }
+ return message;
+ }
+
+ /**
+ * Write messages to the inbound of this {@link Channel}.
+ *
+ * @param msgs the messages to be written
+ *
+ * @return {@code true} if the write operation did add something to the inbound buffer
+ */
+ public boolean writeInbound(Object... msgs) {
+ ensureOpen();
+ if (msgs.length == 0) {
+ return isNotEmpty(inboundMessages);
+ }
+
+ ChannelPipeline p = pipeline();
+ for (Object m: msgs) {
+ p.fireChannelRead(m);
+ }
+
+ flushInbound(false, voidPromise());
+ return isNotEmpty(inboundMessages);
+ }
+
+ /**
+ * Writes one message to the inbound of this {@link Channel} and does not flush it. This
+ * method is conceptually equivalent to {@link #write(Object)}.
+ *
+ * @see #writeOneOutbound(Object)
+ */
+ public ChannelFuture writeOneInbound(Object msg) {
+ return writeOneInbound(msg, newPromise());
+ }
+
+ /**
+ * Writes one message to the inbound of this {@link Channel} and does not flush it. This
+ * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
+ *
+ * @see #writeOneOutbound(Object, ChannelPromise)
+ */
+ public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
+ if (checkOpen(true)) {
+ pipeline().fireChannelRead(msg);
+ }
+ return checkException(promise);
+ }
+
+ /**
+ * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
+ *
+ * @see #flushOutbound()
+ */
+ public Plc4xEmbeddedChannel flushInbound() {
+ flushInbound(true, voidPromise());
+ return this;
+ }
+
+ private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
+ if (checkOpen(recordException)) {
+ pipeline().fireChannelReadComplete();
+ runPendingTasks();
+ }
+
+ return checkException(promise);
+ }
+
+ /**
+ * Write messages to the outbound of this {@link Channel}.
+ *
+ * @param msgs the messages to be written
+ * @return bufferReadable returns {@code true} if the write operation did add something to the outbound buffer
+ */
+ public boolean writeOutbound(Object... msgs) {
+ ensureOpen();
+ if (msgs.length == 0) {
+ return isNotEmpty(outboundMessages);
+ }
+
+ RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
+ try {
+ for (Object m: msgs) {
+ if (m == null) {
+ break;
+ }
+ futures.add(write(m));
+ }
+
+ flushOutbound0();
+
+ int size = futures.size();
+ for (int i = 0; i < size; i++) {
+ ChannelFuture future = (ChannelFuture) futures.get(i);
+ if (future.isDone()) {
+ recordException(future);
+ } else {
+ // The write may be delayed to run later by runPendingTasks()
+ future.addListener(recordExceptionListener);
+ }
+ }
+
+ checkException();
+ return isNotEmpty(outboundMessages);
+ } finally {
+ futures.recycle();
+ }
+ }
+
+ /**
+ * Writes one message to the outbound of this {@link Channel} and does not flush it. This
+ * method is conceptually equivalent to {@link #write(Object)}.
+ *
+ * @see #writeOneInbound(Object)
+ */
+ public ChannelFuture writeOneOutbound(Object msg) {
+ return writeOneOutbound(msg, newPromise());
+ }
+
+ /**
+ * Writes one message to the outbound of this {@link Channel} and does not flush it. This
+ * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
+ *
+ * @see #writeOneInbound(Object, ChannelPromise)
+ */
+ public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
+ if (checkOpen(true)) {
+ return write(msg, promise);
+ }
+ return checkException(promise);
+ }
+
+ /**
+ * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
+ *
+ * @see #flushInbound()
+ */
+ public Plc4xEmbeddedChannel flushOutbound() {
+ if (checkOpen(true)) {
+ flushOutbound0();
+ }
+ checkException(voidPromise());
+ return this;
+ }
+
+ private void flushOutbound0() {
+ // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
+ // delay the write on the next eventloop run.
+ runPendingTasks();
+
+ flush();
+ }
+
+ /**
+ * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
+ *
+ * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+ */
+ public boolean finish() {
+ return finish(false);
+ }
+
+ /**
+ * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
+ * Any further try to write data to it will fail.
+ *
+ * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+ */
+ public boolean finishAndReleaseAll() {
+ return finish(true);
+ }
+
+ /**
+ * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
+ *
+ * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
+ * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
+ */
+ private boolean finish(boolean releaseAll) {
+ close();
+ try {
+ checkException();
+ return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
+ } finally {
+ if (releaseAll) {
+ releaseAll(inboundMessages);
+ releaseAll(outboundMessages);
+ }
+ }
+ }
+
+ /**
+ * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
+ * otherwise.
+ */
+ public boolean releaseInbound() {
+ return releaseAll(inboundMessages);
+ }
+
+ /**
+ * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
+ * otherwise.
+ */
+ public boolean releaseOutbound() {
+ return releaseAll(outboundMessages);
+ }
+
+ private static boolean releaseAll(Queue<Object> queue) {
+ if (isNotEmpty(queue)) {
+ for (;;) {
+ Object msg = queue.poll();
+ if (msg == null) {
+ break;
+ }
+ ReferenceCountUtil.release(msg);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void finishPendingTasks(boolean cancel) {
+ runPendingTasks();
+ if (cancel) {
+ // Cancel all scheduled tasks that are left.
+ loop.cancelScheduledTasks();
+ }
+ }
+
+ @Override
+ public final ChannelFuture close() {
+ return close(newPromise());
+ }
+
+ @Override
+ public final ChannelFuture disconnect() {
+ return disconnect(newPromise());
+ }
+
+ @Override
+ public final ChannelFuture close(ChannelPromise promise) {
+ // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
+ // that needs to be run before the actual close takes place.
+ runPendingTasks();
+ ChannelFuture future = super.close(promise);
+
+ // Now finish everything else and cancel all scheduled tasks that were not ready set.
+ finishPendingTasks(true);
+ return future;
+ }
+
+ @Override
+ public final ChannelFuture disconnect(ChannelPromise promise) {
+ ChannelFuture future = super.disconnect(promise);
+ finishPendingTasks(!metadata.hasDisconnect());
+ return future;
+ }
+
+ private static boolean isNotEmpty(Queue<Object> queue) {
+ return queue != null && !queue.isEmpty();
+ }
+
+ private static Object poll(Queue<Object> queue) {
+ return queue != null ? queue.poll() : null;
+ }
+
+ /**
+ * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
+ * for this {@link Channel}
+ */
+ public void runPendingTasks() {
+ try {
+ loop.runTasks();
+ } catch (Exception e) {
+ recordException(e);
+ }
+
+ try {
+ loop.runScheduledTasks();
+ } catch (Exception e) {
+ recordException(e);
+ }
+ }
+
+ /**
+ * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
+ * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
+ * {@code -1}.
+ */
+ public long runScheduledPendingTasks() {
+ try {
+ return loop.runScheduledTasks();
+ } catch (Exception e) {
+ recordException(e);
+ return loop.nextScheduledTask();
+ }
+ }
+
+ private void recordException(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ recordException(future.cause());
+ }
+ }
+
+ private void recordException(Throwable cause) {
+ if (lastException == null) {
+ lastException = cause;
+ } else {
+ logger.warn(
+ "More than one exception was raised. " +
+ "Will report only the first one and log others.", cause);
+ }
+ }
+
+ /**
+ * Checks for the presence of an {@link Exception}.
+ */
+ private ChannelFuture checkException(ChannelPromise promise) {
+ Throwable t = lastException;
+ if (t != null) {
+ lastException = null;
+
+ if (promise.isVoid()) {
+ PlatformDependent.throwException(t);
+ }
+
+ return promise.setFailure(t);
+ }
+
+ return promise.setSuccess();
+ }
+
+ /**
+ * Check if there was any {@link Throwable} received and if so rethrow it.
+ */
+ public void checkException() {
+ checkException(voidPromise());
+ }
+
+ /**
+ * Returns {@code true} if the {@link Channel} is open and records optionally
+ * an {@link Exception} if it isn't.
+ */
+ private boolean checkOpen(boolean recordException) {
+ if (!isOpen()) {
+ if (recordException) {
+ recordException(new ClosedChannelException());
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Ensure the {@link Channel} is open and if not throw an exception.
+ */
+ protected final void ensureOpen() {
+ if (!checkOpen(true)) {
+ checkException();
+ }
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop loop) {
+ return loop instanceof Plc4xEmbeddedEventLoop;
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return isActive()? LOCAL_ADDRESS : null;
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return isActive()? REMOTE_ADDRESS : null;
+ }
+
+ @Override
+ protected void doRegister() throws Exception {
+ state = State.ACTIVE;
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+ // NOOP
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ if (!metadata.hasDisconnect()) {
+ doClose();
+ }
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ state = State.CLOSED;
+ }
+
+ @Override
+ protected void doBeginRead() throws Exception {
+ // NOOP
+ }
+
+ @Override
+ protected AbstractUnsafe newUnsafe() {
+ return new EmbeddedUnsafe();
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ return ((EmbeddedUnsafe) super.unsafe()).wrapped;
+ }
+
+ @Override
+ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+ for (;;) {
+ Object msg = in.current();
+ if (msg == null) {
+ break;
+ }
+
+ ReferenceCountUtil.retain(msg);
+ handleOutboundMessage(msg);
+ in.remove();
+ }
+ }
+
+ /**
+ * Called for each outbound message.
+ *
+ * @see #doWrite(ChannelOutboundBuffer)
+ */
+ protected void handleOutboundMessage(Object msg) {
+ outboundMessages().add(msg);
+ }
+
+ /**
+ * Called for each inbound message.
+ */
+ protected void handleInboundMessage(Object msg) {
+ inboundMessages().add(msg);
+ }
+
+ private final class EmbeddedUnsafe extends AbstractUnsafe {
+
+ // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
+ // that may change the state of the Channel and may schedule tasks for later execution.
+ final Unsafe wrapped = new Unsafe() {
+ @Override
+ public RecvByteBufAllocator.Handle recvBufAllocHandle() {
+ return EmbeddedUnsafe.this.recvBufAllocHandle();
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return EmbeddedUnsafe.this.localAddress();
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return EmbeddedUnsafe.this.remoteAddress();
+ }
+
+ @Override
+ public void register(EventLoop eventLoop, ChannelPromise promise) {
+ EmbeddedUnsafe.this.register(eventLoop, promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void bind(SocketAddress localAddress, ChannelPromise promise) {
+ EmbeddedUnsafe.this.bind(localAddress, promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void disconnect(ChannelPromise promise) {
+ EmbeddedUnsafe.this.disconnect(promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void close(ChannelPromise promise) {
+ EmbeddedUnsafe.this.close(promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void closeForcibly() {
+ EmbeddedUnsafe.this.closeForcibly();
+ runPendingTasks();
+ }
+
+ @Override
+ public void deregister(ChannelPromise promise) {
+ EmbeddedUnsafe.this.deregister(promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void beginRead() {
+ EmbeddedUnsafe.this.beginRead();
+ runPendingTasks();
+ }
+
+ @Override
+ public void write(Object msg, ChannelPromise promise) {
+ EmbeddedUnsafe.this.write(msg, promise);
+ runPendingTasks();
+ }
+
+ @Override
+ public void flush() {
+ EmbeddedUnsafe.this.flush();
+ runPendingTasks();
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return EmbeddedUnsafe.this.voidPromise();
+ }
+
+ @Override
+ public ChannelOutboundBuffer outboundBuffer() {
+ return EmbeddedUnsafe.this.outboundBuffer();
+ }
+ };
+
+ @Override
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ safeSetSuccess(promise);
+ }
+ }
+
+ private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
+ EmbeddedChannelPipeline(Plc4xEmbeddedChannel channel) {
+ super(channel);
+ }
+
+ @Override
+ protected void onUnhandledInboundException(Throwable cause) {
+ recordException(cause);
+ }
+
+ @Override
+ protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
+ handleInboundMessage(msg);
+ }
+ }
+}
diff --git a/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java
new file mode 100644
index 0000000..d1aa600
--- /dev/null
+++ b/plc4j/transports/test/src/main/java/io/netty/channel/embedded/Plc4xEmbeddedEventLoop.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.embedded;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.*;
+import io.netty.util.internal.ObjectUtil;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+final class Plc4xEmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
+
+ private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
+ private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
+
+ @Override
+ public EventLoopGroup parent() {
+ return (EventLoopGroup) super.parent();
+ }
+
+ @Override
+ public EventLoop next() {
+ return (EventLoop) super.next();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ tasks.add(ObjectUtil.checkNotNull(command, "command"));
+ }
+
+ void runTasks() {
+ for (;;) {
+ Runnable task = tasks.poll();
+ if (task == null) {
+ break;
+ }
+
+ task.run();
+ }
+ }
+
+ long runScheduledTasks() {
+ long time = AbstractScheduledEventExecutor.nanoTime();
+ for (;;) {
+ Runnable task = pollScheduledTask(time);
+ if (task == null) {
+ return nextScheduledTaskNano();
+ }
+
+ task.run();
+ }
+ }
+
+ long nextScheduledTask() {
+ return nextScheduledTaskNano();
+ }
+
+ @Override
+ protected void cancelScheduledTasks() {
+ super.cancelScheduledTasks();
+ }
+
+ @Override
+ public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<?> terminationFuture() {
+ return terminationFuture;
+ }
+
+ @Override
+ @Deprecated
+ public void shutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isShuttingDown() {
+ return false;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public ChannelFuture register(Channel channel) {
+ return register(new DefaultChannelPromise(channel, this));
+ }
+
+ @Override
+ public ChannelFuture register(ChannelPromise promise) {
+ ObjectUtil.checkNotNull(promise, "promise");
+ promise.channel().unsafe().register(this, promise);
+ return promise;
+ }
+
+ @Deprecated
+ @Override
+ public ChannelFuture register(Channel channel, ChannelPromise promise) {
+ channel.unsafe().register(this, promise);
+ return promise;
+ }
+
+ @Override
+ public boolean inEventLoop() {
+ return true;
+ }
+
+ @Override
+ public boolean inEventLoop(Thread thread) {
+ return true;
+ }
+}
diff --git a/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java b/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
index 084ed7d..e7b206f 100644
--- a/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
+++ b/plc4j/transports/test/src/main/java/org/apache/plc4x/java/transport/test/TestChannelFactory.java
@@ -20,7 +20,10 @@ package org.apache.plc4x.java.transport.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.embedded.Plc4xEmbeddedChannel;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.connection.NettyChannelFactory;
import org.slf4j.Logger;
@@ -45,11 +48,17 @@ public class TestChannelFactory extends NettyChannelFactory implements HasConfig
@Override
public Class<? extends Channel> getChannel() {
- return EmbeddedChannel.class;
+ return Plc4xEmbeddedChannel.class;
+ }
+
+ @Override
+ public EventLoopGroup getEventLoopGroup() {
+ return null;
}
@Override
public void configureBootstrap(Bootstrap bootstrap) {
+ bootstrap.localAddress(new TestSocketAddress("lalala"));
if(configuration != null) {
logger.info("Configuring Bootstrap with {}", configuration);
}
diff --git a/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
deleted file mode 100644
index ebff4aa..0000000
--- a/plc4j/transports/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.plc4x.java.mock.PlcMockDriver
diff --git a/plc4j/utils/test-utils/pom.xml b/plc4j/utils/test-utils/pom.xml
index c2e4843..9eb3cea 100644
--- a/plc4j/utils/test-utils/pom.xml
+++ b/plc4j/utils/test-utils/pom.xml
@@ -32,12 +32,44 @@
<name>PLC4J: Utils: Test Utils</name>
<description>A set of test utils. Especially defining the test-categories used to categorize tests.</description>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies combine.children="append">
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
<dependencies>
<dependency>
<groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-api</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-spi</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-transport-test</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
<dependency>
<groupId>commons-codec</groupId>
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
index c022949..5468ba1 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
@@ -18,10 +18,21 @@ under the License.
*/
package org.apache.plc4x.test.driver;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import org.apache.plc4x.java.spi.generation.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.embedded.Plc4xEmbeddedChannel;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.spi.connection.ChannelExposingConnection;
+import org.apache.plc4x.java.spi.connection.GeneratedDriverBase;
+import org.apache.plc4x.java.spi.generation.*;
import org.apache.plc4x.test.driver.exceptions.DriverTestsuiteException;
import org.apache.plc4x.test.driver.model.DriverTestsuite;
+import org.apache.plc4x.test.driver.model.StepType;
import org.apache.plc4x.test.driver.model.TestStep;
import org.apache.plc4x.test.driver.model.Testcase;
import org.dom4j.Document;
@@ -33,11 +44,13 @@ import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xmlunit.builder.DiffBuilder;
+import org.xmlunit.diff.Diff;
-import javax.xml.stream.XMLStreamException;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.TimeUnit;
public class DriverTestsuiteRunner {
@@ -86,13 +99,17 @@ public class DriverTestsuiteRunner {
List<TestStep> setupSteps = new LinkedList<>();
if(testsuiteXml.element(new QName("setup")) != null) {
Element setupElement = testsuiteXml.element(new QName("setup"));
- setupSteps.add(new TestStep(setupElement));
+ for (Element element : setupElement.elements()) {
+ setupSteps.add(parseTestStep(element));
+ }
}
List<TestStep> teardownSteps = new LinkedList<>();
if(testsuiteXml.element(new QName("teardown")) != null) {
Element teardownElement = testsuiteXml.element(new QName("teardown"));
- setupSteps.add(new TestStep(teardownElement));
+ for (Element element : teardownElement.elements()) {
+ setupSteps.add(parseTestStep(element));
+ }
}
List<Element> testcasesXml = testsuiteXml.elements(new QName("testcase"));
@@ -107,28 +124,181 @@ public class DriverTestsuiteRunner {
List<TestStep> steps = new LinkedList<>();
for (Element element : stepsElement.elements()) {
- steps.add(new TestStep(element));
+ steps.add(parseTestStep(element));
}
testcases.add(new Testcase(name, description, steps));
}
LOGGER.info(String.format("Found %d testcases.", testcases.size()));
- return new DriverTestsuite(
- testsuiteName, driverName, driverParameters, setupSteps, teardownSteps, testcases);
+
+ // Force the driver to not wait for the connection before returning the connection.
+ System.setProperty(GeneratedDriverBase.PROPERTY_PLC4X_FORCE_AWAIT_SETUP_COMPLETE, "false");
+
+ PlcConnection connection = getConnection(driverName, driverParameters);
+
+ TimeUnit.MILLISECONDS.sleep(200);
+
+ return new DriverTestsuite(testsuiteName, connection, setupSteps, teardownSteps, testcases);
} catch (DocumentException e) {
throw new DriverTestsuiteException("Error parsing testsuite xml", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new DriverTestsuiteException("Interruption setting up testsuite xml", e);
+ }
+ }
+
+ private PlcConnection getConnection(String driverName, Map<String, String> driverParameters)
+ throws DriverTestsuiteException {
+ try {
+ StringBuilder sb = new StringBuilder();
+ if(driverParameters != null) {
+ for (Map.Entry<String, String> parameter : driverParameters.entrySet()) {
+ sb.append("&").append(parameter.getKey()).append("=").append(parameter.getValue());
+ }
+ }
+ if(sb.length() > 0) {
+ sb.replace(0, 1, "?");
+ }
+ return new PlcDriverManager().getConnection(driverName + ":test://hurz" + sb.toString());
+ } catch (PlcConnectionException e) {
+ throw new DriverTestsuiteException("Error loading driver", e);
}
}
private void run(DriverTestsuite testSuite, Testcase testcase) throws DriverTestsuiteException {
- XmlMapper xmlMapper = new XmlMapper();
+ final Plc4xEmbeddedChannel embeddedChannel = getEmbeddedChannel(testSuite);
+ if(!testSuite.getSetupSteps().isEmpty()) {
+ LOGGER.info("Running setup steps");
+ for (TestStep setupStep : testSuite.getSetupSteps()) {
+ executeStep(setupStep, embeddedChannel);
+ }
+ LOGGER.info("Finished setup steps");
+ }
+ LOGGER.info("Running test steps");
for (TestStep step : testcase.getSteps()) {
- String referenceXml = step.getPayload().asXML();
- try {
- final Message message = xmlMapper.readValue(referenceXml, Message.class);
- System.out.println(message);
- } catch (IOException e) {
- e.printStackTrace();
+ executeStep(step, embeddedChannel);
+ }
+ LOGGER.info("Finished test steps");
+ if(!testSuite.getTeardownSteps().isEmpty()) {
+ LOGGER.info("Running teardown steps");
+ for (TestStep teardownStep : testSuite.getTeardownSteps()) {
+ executeStep(teardownStep, embeddedChannel);
+ }
+ LOGGER.info("Finished teardown steps");
+ }
+ }
+
+ private void executeStep(TestStep testStep, Plc4xEmbeddedChannel embeddedChannel) throws DriverTestsuiteException {
+ LOGGER.info(" - Running step " + testStep.getType());
+ ObjectMapper mapper = new XmlMapper().enableDefaultTyping();
+ final Element payload = testStep.getPayload();
+ final String className = payload.attributeValue(new QName("className"));
+ String referenceXml = payload.asXML();
+ try {
+ final MessageIO messageIO = getMessageIOType(className).newInstance();
+ switch (testStep.getType()) {
+ case SEND_PLC_BYTES: {
+ break;
+ }
+ case SEND_PLC_MESSAGE: {
+ final ByteBuf byteBuf = embeddedChannel.readOutbound();
+ byte[] data = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(data);
+ ReadBuffer readBuffer = new ReadBuffer(data);
+ try {
+ final Object parsedOutput = messageIO.parse(readBuffer);
+ String xmlString = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(parsedOutput);
+ Diff diff = DiffBuilder.compare(referenceXml).withTest(xmlString).ignoreComments().ignoreWhitespace().build();
+ if (diff.hasDifferences()) {
+ System.out.println(xmlString);
+ throw new DriverTestsuiteException("Differences were found after parsing.\n" + diff.toString());
+ }
+ } catch (ParseException e) {
+ throw new DriverTestsuiteException("Error parsing message", e);
+ }
+ break;
+ }
+ case RECEIVE_PLC_BYTES: {
+ break;
+ }
+ case RECEIVE_PLC_MESSAGE: {
+ final Message message = mapper.readValue(referenceXml, getMessageType(className));
+ WriteBuffer writeBuffer = new WriteBuffer(1024);
+ try {
+ messageIO.serialize(writeBuffer, message);
+ byte[] data = new byte[writeBuffer.getPos()];
+ System.arraycopy(writeBuffer.getData(), 0, data, 0, writeBuffer.getPos());
+ embeddedChannel.writeOutbound(data);
+ } catch (ParseException e) {
+ throw new DriverTestsuiteException("Error serializing message", e);
+ }
+ break;
+ }
+ case API_REQUEST: {
+ break;
+ }
+ case API_RESPONSE: {
+ break;
+ }
+ case DELAY: {
+ try {
+ TimeUnit.MILLISECONDS.sleep(200);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new DriverTestsuiteException("Interrupted during delay.");
+ }
+ }
+
+ }
+ } catch (IOException e) {
+ throw new DriverTestsuiteException("Error processing the xml", e);
+ } catch (IllegalAccessException | InstantiationException e) {
+ throw new DriverTestsuiteException("Error instantiating MessageIO class", e);
+ }
+ LOGGER.info(" Done");
+ }
+
+ private TestStep parseTestStep(Element curElement) {
+ final String elementName = curElement.getName();
+ final StepType stepType = StepType.valueOf(elementName.toUpperCase().replace("-", "_"));
+ final Element definition = curElement.elementIterator().next();
+ return new TestStep(stepType, definition);
+ }
+
+ private Plc4xEmbeddedChannel getEmbeddedChannel(DriverTestsuite testSuite) {
+ if(testSuite.getConnection() instanceof ChannelExposingConnection) {
+ ChannelExposingConnection connection = (ChannelExposingConnection) testSuite.getConnection();
+ Channel channel = connection.getChannel();
+ if(channel instanceof Plc4xEmbeddedChannel) {
+ return (Plc4xEmbeddedChannel) channel;
+ }
+ throw new PlcRuntimeException("Expecting EmbeddedChannel");
+ }
+ throw new PlcRuntimeException("Expecting ChannelExposingConnection");
+ }
+
+ private Class<? extends Message> getMessageType(String messageClassName) throws DriverTestsuiteException {
+ try {
+ final Class<?> messageClass = Class.forName(messageClassName);
+ if(Message.class.isAssignableFrom(messageClass)) {
+ return (Class<? extends Message>) messageClass;
+ }
+ throw new DriverTestsuiteException("IO class muss implement Message interface");
+ } catch (ClassNotFoundException e) {
+ throw new DriverTestsuiteException("Error loading message class", e);
+ }
+ }
+
+ private Class<? extends MessageIO> getMessageIOType(String messageClassName) throws DriverTestsuiteException {
+ String ioClassName = messageClassName.substring(0, messageClassName.lastIndexOf('.')) + ".io." +
+ messageClassName.substring(messageClassName.lastIndexOf('.') + 1) + "IO";
+ try {
+ final Class<?> ioClass = Class.forName(ioClassName);
+ if(MessageIO.class.isAssignableFrom(ioClass)) {
+ return (Class<? extends MessageIO>) ioClass;
}
+ throw new DriverTestsuiteException("IO class muss implement MessageIO interface");
+ } catch (ClassNotFoundException e) {
+ throw new DriverTestsuiteException("Error loading io class", e);
}
}
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
index 6b12554..8ee9dab 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
@@ -18,22 +18,21 @@ under the License.
*/
package org.apache.plc4x.test.driver.model;
+import org.apache.plc4x.java.api.PlcConnection;
+
import java.util.List;
-import java.util.Map;
public class DriverTestsuite {
private final String name;
- private final String driverName;
- private final Map<String, String> parameters;
+ private final PlcConnection connection;
private final List<TestStep> setupSteps;
private final List<TestStep> teardownSteps;
private final List<Testcase> testcases;
- public DriverTestsuite(String name, String driverName, Map<String, String> parameters, List<TestStep> setupSteps, List<TestStep> teardownSteps, List<Testcase> testcases) {
+ public DriverTestsuite(String name, PlcConnection connection, List<TestStep> setupSteps, List<TestStep> teardownSteps, List<Testcase> testcases) {
this.name = name;
- this.driverName = driverName;
- this.parameters = parameters;
+ this.connection = connection;
this.setupSteps = setupSteps;
this.teardownSteps = teardownSteps;
this.testcases = testcases;
@@ -43,12 +42,8 @@ public class DriverTestsuite {
return name;
}
- public String getDriverName() {
- return driverName;
- }
-
- public Map<String, String> getParameters() {
- return parameters;
+ public PlcConnection getConnection() {
+ return connection;
}
public List<TestStep> getSetupSteps() {
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
similarity index 59%
copy from plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
copy to plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
index 92e281c..93d9f3f 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/StepType.java
@@ -18,18 +18,24 @@ under the License.
*/
package org.apache.plc4x.test.driver.model;
-import org.dom4j.Element;
-
-public class TestStep {
-
- private final Element payload;
-
- public TestStep(Element payload) {
- this.payload = payload;
- }
-
- public Element getPayload() {
- return payload;
- }
+public enum StepType {
+
+ /*
+ PLC --------- PLC4X --------- Application
+ SEND_PLC_MESSAGE: <--------
+ SEND_PLC_BYTES: <--------
+ RECEIVE_PLC_MESSAGE: -------->
+ RECEIVE_PLC_BYTES: -------->
+ API_REQUEST: <--------
+ API_RESPONSE: -------->
+ */
+
+ SEND_PLC_MESSAGE,
+ SEND_PLC_BYTES,
+ RECEIVE_PLC_MESSAGE,
+ RECEIVE_PLC_BYTES,
+ API_REQUEST,
+ API_RESPONSE,
+ DELAY
}
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
index 92e281c..1e9e37d 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/TestStep.java
@@ -22,12 +22,18 @@ import org.dom4j.Element;
public class TestStep {
+ private final StepType type;
private final Element payload;
- public TestStep(Element payload) {
+ public TestStep(StepType type, Element payload) {
+ this.type = type;
this.payload = payload;
}
+ public StepType getType() {
+ return type;
+ }
+
public Element getPayload() {
return payload;
}
diff --git a/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd b/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
index 3ea8442..a97399d 100644
--- a/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
+++ b/plc4j/utils/test-utils/src/main/resources/schemas/driver-testsuite.xsd
@@ -49,10 +49,13 @@
<xs:complexType>
<xs:sequence minOccurs="1" maxOccurs="unbounded">
<xs:choice>
- <xs:element name="send-bytes" type="xs:anyType"/>
- <xs:element name="receive-bytes" type="xs:anyType"/>
- <xs:element name="plc4x-request" type="xs:anyType"/>
- <xs:element name="plc4x-response" type="xs:anyType"/>
+ <xs:element name="send-plc-message" type="xs:anyType"/>
+ <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="receive-plc-message" type="xs:anyType"/>
+ <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="api-request" type="xs:anyType"/>
+ <xs:element name="api-response" type="xs:anyType"/>
+ <xs:element name="delay" type="xs:integer"/>
</xs:choice>
</xs:sequence>
</xs:complexType>
@@ -63,10 +66,13 @@
<xs:complexType>
<xs:sequence minOccurs="1" maxOccurs="unbounded">
<xs:choice>
- <xs:element name="send-bytes" type="xs:anyType"/>
- <xs:element name="receive-bytes" type="xs:anyType"/>
- <xs:element name="plc4x-request" type="xs:anyType"/>
- <xs:element name="plc4x-response" type="xs:anyType"/>
+ <xs:element name="send-plc-message" type="xs:anyType"/>
+ <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="receive-plc-message" type="xs:anyType"/>
+ <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="api-request" type="xs:anyType"/>
+ <xs:element name="api-response" type="xs:anyType"/>
+ <xs:element name="delay" type="xs:integer"/>
</xs:choice>
</xs:sequence>
</xs:complexType>
@@ -84,10 +90,13 @@
<xs:complexType>
<xs:sequence minOccurs="1" maxOccurs="unbounded">
<xs:choice>
- <xs:element name="send-bytes" type="xs:anyType"/>
- <xs:element name="receive-bytes" type="xs:anyType"/>
- <xs:element name="plc4x-request" type="xs:anyType"/>
- <xs:element name="plc4x-response" type="xs:anyType"/>
+ <xs:element name="send-plc-message" type="xs:anyType"/>
+ <xs:element name="send-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="receive-plc-message" type="xs:anyType"/>
+ <xs:element name="receive-plc-bytes" type="xs:hexBinary"/>
+ <xs:element name="api-request" type="xs:anyType"/>
+ <xs:element name="api-response" type="xs:anyType"/>
+ <xs:element name="delay" type="xs:integer"/>
</xs:choice>
</xs:sequence>
</xs:complexType>