You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/19 10:51:44 UTC
[plc4x] branch next-gen-core updated: Added Experimental
Annotation. - Implemented default methods for read, write, subscribe,
... - Direct communication from connection to protocol
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/next-gen-core by this push:
new 4681d64 Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol
4681d64 is described below
commit 4681d641ec997983d6d1036378ca540f7c9cfcb9
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Thu Dec 19 11:51:36 2019 +0100
Added Experimental Annotation.
- Implemented default methods for read, write, subscribe, ...
- Direct communication from connection to protocol
---
.../org/apache/plc4x/java/PlcDriverManager.java | 4 +-
.../java/api/{Changed.java => Experimental.java} | 2 +-
.../plc4x/java/api/PlcConnectionExtension.java | 83 ++++++++++++++++++
.../plc4x/java/api/messages/PlcReadResponse.java | 4 +-
.../plc4x/java/examples/helloplc4x/HelloPlc4x.java | 4 +-
.../src/main/resources/logback.xml | 2 +-
.../apache/plc4x/java/spi/Plc4xNettyWrapper.java | 50 +++++++----
.../apache/plc4x/java/spi/Plc4xProtocolBase.java | 45 +++++++---
.../java/spi/connection/AbstractPlcConnection.java | 98 ++++++++++++++++++++--
.../java/spi/connection/NettyPlcConnection.java | 8 ++
.../spi/internal/DefaultSendRequestContext.java | 6 +-
.../spi/GeneratedDriverByteToMessageCodec.java | 2 +
.../java/s7/readwrite/connection/S7Connection.java | 20 +----
.../s7/readwrite/protocol/Plc4xS7Protocol.java | 84 ++++++++++---------
14 files changed, 313 insertions(+), 99 deletions(-)
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
index 710cdb1..13efee1 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
@@ -18,7 +18,7 @@
*/
package org.apache.plc4x.java;
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -96,7 +96,7 @@ public class PlcDriverManager {
* @return Driver instance for the given protocol
* @throws PlcConnectionException If no Suitable Driver can be found
*/
- @Changed
+ @Experimental
public PlcDriver getDriver(String url) throws PlcConnectionException {
try {
URI connectionUri = new URI(url);
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
similarity index 96%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
index 5e3946b..8d2d8d3 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
@@ -22,5 +22,5 @@ package org.apache.plc4x.java.api;
/**
* Indicates that this is a recent API Change.
*/
-public @interface Changed {
+public @interface Experimental {
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
new file mode 100644
index 0000000..5f36f24
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.api;
+
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.concurrent.Future;
+
+/**
+ * Suggestion for new API
+ */
+@Experimental
+public interface PlcConnectionExtension {
+
+ /**
+ * <code>
+ * "%DB400.xxx"
+ * </code>
+ * <code>
+ * ["%DB400.xxx", ""]
+ * </code>
+ * <code>
+ * { "item1": "%DB400.xxx", "item2": "xx" }
+ * </code>
+ * Prepared Statement
+ * <code>
+ * { "item1": ?, "item2": ? }
+ * </code>
+ *
+ * @param s
+ * @return
+ */
+ @Experimental
+ default Future<NewPlcResponse> query(String s, Object... args) {
+ throw new NotImplementedException();
+ }
+
+ /**
+ * <code>
+ * define { "item1": "%DB400.xxx", "item2": "xx" } AS my_pymél_struct
+ * </code>
+ * <code>
+ * define %DN4ßß" AS "my_structure"
+ * </code>
+ * @param pql
+ * @return
+ */
+ @Experimental
+ default Future<Boolean> execute(String pql) {
+ throw new NotImplementedException();
+ }
+
+ /**
+ * Planned successor for
+ * {@link org.apache.plc4x.java.api.messages.PlcResponse}
+ * {@link org.apache.plc4x.java.api.messages.PlcReadResponse}
+ * {@link org.apache.plc4x.java.api.messages.PlcWriteResponse}
+ * {@link org.apache.plc4x.java.api.messages.PlcSubscriptionResponse}
+ * {@link org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse}
+ */
+ @Experimental
+ interface NewPlcResponse {
+
+ }
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
index 0ab3547..0e56abd 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
@@ -18,7 +18,7 @@
*/
package org.apache.plc4x.java.api.messages;
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
import org.apache.plc4x.java.api.value.PlcValue;
import java.math.BigDecimal;
@@ -36,7 +36,7 @@ public interface PlcReadResponse extends PlcFieldResponse {
@Override
PlcReadRequest getRequest();
- @Changed
+ @Experimental
PlcValue getAsPlcValue();
int getNumberOfValues(String name);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 8f8b23e..972550b 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
public class HelloPlc4x {
@@ -79,7 +81,7 @@ public class HelloPlc4x {
// Read asynchronously ...
// Register a callback executed as soon as a response arrives.
logger.info("Asynchronous request ...");
- CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+ CompletionStage<? extends PlcReadResponse> asyncResponse = readRequest.execute();
asyncResponse.whenComplete((readResponse, throwable) -> {
if (readResponse != null) {
printResponse(readResponse);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
index 27d40c0..a8ddebb 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
+++ b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
@@ -29,7 +29,7 @@
</encoder>
</appender>
- <root level="info">
+ <root level="trace">
<appender-ref ref="STDOUT" />
</root>
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 7f21126..37f2145 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -20,6 +20,7 @@
package org.apache.plc4x.java.spi;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageCodec;
import io.vavr.control.Either;
import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -40,29 +41,50 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestContainer> {
+public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
private static final Logger logger = LoggerFactory.getLogger(Plc4xNettyWrapper.class);
private final Plc4xProtocolBase<T> protocolBase;
private final Queue<HandlerRegistration> registeredHandlers;
+ private final ChannelPipeline pipeline;
- public Plc4xNettyWrapper(Plc4xProtocolBase<T> parent, Class<T> clazz) {
- super(clazz, PlcRequestContainer.class);
+ public Plc4xNettyWrapper(ChannelPipeline pipeline, Plc4xProtocolBase<T> protocol, Class<T> clazz) {
+ super(clazz, Object.class);
+ this.pipeline = pipeline;
this.registeredHandlers = new ConcurrentLinkedQueue<>();
- this.protocolBase = parent;
+ this.protocolBase = protocol;
+ this.protocolBase.setContext(new ConversationContext<T>() {
+ @Override public void sendToWire(T msg) {
+ pipeline.writeAndFlush(msg);
+ }
+
+ @Override public void fireConnected() {
+ pipeline.fireUserEventTriggered(ConnectedEvent.class);
+ }
+
+ @Override public SendRequestContext<T> sendRequest(T packet) {
+ return new DefaultSendRequestContext<T>(handler -> {
+ logger.trace("Adding Response Handler...");
+ registeredHandlers.add(handler);
+ }, packet, this);
+ }
+ });
}
@Override
- protected void encode(ChannelHandlerContext channelHandlerContext, PlcRequestContainer plcRequestContainer, List<Object> list) throws Exception {
- logger.trace("Encoding {}", plcRequestContainer);
- protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
- @Override
- public void sendToWire(T msg) {
- logger.trace("Sending to wire {}", msg);
- list.add(msg);
- }
- }, plcRequestContainer);
+ protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> list) throws Exception {
+// logger.trace("Encoding {}", plcRequestContainer);
+// protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
+// @Override
+// public void sendToWire(T msg) {
+// logger.trace("Sending to wire {}", msg);
+// list.add(msg);
+// }
+// }, plcRequestContainer);
+ // NOOP
+ logger.info("Forwarding request to plc {}", msg);
+ list.add(msg);
}
@Override
@@ -145,7 +167,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestCon
return new DefaultSendRequestContext<T1>(handler -> {
logger.trace("Adding Response Handler...");
registeredHandlers.add(handler);
- }, packet, (DefaultConversationContext)this);
+ }, packet, this);
}
}
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
index f8fd0a2..24f51f6 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
@@ -19,27 +19,35 @@
package org.apache.plc4x.java.spi;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
public abstract class Plc4xProtocolBase<T> {
+ protected ConversationContext<T> context;
+
+ public void setContext(ConversationContext<T> context) {
+ this.context = context;
+ }
+
public void onConnect(ConversationContext<T> context) {
// Intentionally do nothing here
}
/**
- * @param context
- * @param msg
- * @throws Exception
- * @deprecated will be replaced by direct calls
- */
- @Deprecated
- protected abstract void encode(ConversationContext<T> context, PlcRequestContainer msg) throws Exception;
-
- /**
* TODO document me
* <p>
- * Can be used for non request incoming messages
+ * Can be used for non requested incoming messages
*
* @param context
* @param msg
@@ -48,4 +56,19 @@ public abstract class Plc4xProtocolBase<T> {
protected void decode(ConversationContext<T> context, T msg) throws Exception {
}
+ public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+ throw new NotImplementedException("");
+ }
+
+ public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+ throw new NotImplementedException("");
+ }
+
+ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+ throw new NotImplementedException("");
+ }
+
+ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+ throw new NotImplementedException("");
+ }
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
index 9c0a89a..4b3d2ac 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
@@ -21,14 +21,32 @@ package org.apache.plc4x.java.spi.connection;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
+import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcUnsubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
import org.apache.plc4x.java.spi.messages.InternalPlcMessage;
+import org.apache.plc4x.java.spi.messages.PlcReader;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.messages.PlcWriter;
+import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
/**
* Base class for implementing connections.
@@ -36,7 +54,31 @@ import java.util.concurrent.CompletableFuture;
* Concrete implementations should override the methods indicating connection capabilities
* and for obtaining respective request builders.
*/
-public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata {
+public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata, PlcReader, PlcWriter , PlcSubscriber {
+
+ private boolean canRead = false;
+ private boolean canWrite = false;
+ private boolean canSubscribe = false;
+ private PlcFieldHandler fieldHandler;
+ private Plc4xProtocolBase<?> protocol;
+
+ /**
+ * @deprecated only for compatibility reasons.
+ */
+ @Deprecated
+ public AbstractPlcConnection() {
+ }
+
+ public AbstractPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe, PlcFieldHandler fieldHandler) {
+ this.canRead = canRead;
+ this.canWrite = canWrite;
+ this.canSubscribe = canSubscribe;
+ this.fieldHandler = fieldHandler;
+ }
+
+ public void setProtocol(Plc4xProtocolBase<?> protocol) {
+ this.protocol = protocol;
+ }
@Override
public PlcConnectionMetadata getMetadata() {
@@ -52,37 +94,77 @@ public abstract class AbstractPlcConnection implements PlcConnection, PlcConnect
@Override
public boolean canRead() {
- return false;
+ return canRead;
}
@Override
public boolean canWrite() {
- return false;
+ return canWrite;
}
@Override
public boolean canSubscribe() {
- return false;
+ return canSubscribe;
+ }
+
+ public PlcFieldHandler getPlcFieldHandler() {
+ return this.fieldHandler;
}
@Override
public PlcReadRequest.Builder readRequestBuilder() {
- throw new PlcUnsupportedOperationException("The connection does not support reading");
+ if (!canRead()) {
+ throw new PlcUnsupportedOperationException("The connection does not support reading");
+ }
+ return new DefaultPlcReadRequest.Builder(this, getPlcFieldHandler());
}
@Override
public PlcWriteRequest.Builder writeRequestBuilder() {
- throw new PlcUnsupportedOperationException("The connection does not support writing");
+ if (!canWrite()) {
+ throw new PlcUnsupportedOperationException("The connection does not support writing");
+ }
+ return new DefaultPlcWriteRequest.Builder(this, getPlcFieldHandler());
}
@Override
public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
- throw new PlcUnsupportedOperationException("The connection does not support subscription");
+ if (!canSubscribe()) {
+ throw new PlcUnsupportedOperationException("The connection does not support subscription");
+ }
+ return new DefaultPlcSubscriptionRequest.Builder(this, getPlcFieldHandler());
}
@Override
public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
- throw new PlcUnsupportedOperationException("The connection does not support subscription");
+ if (!canSubscribe) {
+ throw new PlcUnsupportedOperationException("The connection does not support subscription");
+ }
+ return new DefaultPlcUnsubscriptionRequest.Builder(this);
+ }
+
+ @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+ return protocol.read(readRequest);
+ }
+
+ @Override public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+ return protocol.write(writeRequest);
+ }
+
+ @Override public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+ return protocol.subscribe(subscriptionRequest);
+ }
+
+ @Override public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+ return protocol.unsubscribe(unsubscriptionRequest);
+ }
+
+ @Override public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+ return null;
+ }
+
+ @Override public void unregister(PlcConsumerRegistration registration) {
+
}
/**
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
index 4a481f0..747b725 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
@@ -25,6 +25,7 @@ import io.netty.util.Timer;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -55,6 +56,13 @@ public abstract class NettyPlcConnection extends AbstractPlcConnection {
this.connected = false;
}
+ protected NettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler) {
+ super(true, true, false, handler);
+ this.channelFactory = channelFactory;
+ this.awaitSessionSetupComplete = awaitSessionSetupComplete;
+ this.connected = false;
+ }
+
@Override
public void connect() throws PlcConnectionException {
try {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 45bc4fb..0755259 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -42,7 +42,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
private final Object request;
@SuppressWarnings("unchecked")
- private final Plc4xNettyWrapper.DefaultConversationContext context;
+ private final ConversationContext context;
protected Class<?> expectClazz;
@@ -54,13 +54,13 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
protected Duration timeout;
- public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, Plc4xNettyWrapper<T>.DefaultConversationContext<T> context) {
+ public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, ConversationContext<T> context) {
this.finisher = finisher;
this.request = request;
this.context = context;
}
- protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+ protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, ConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
this.commands = commands;
this.timeout = timeout;
this.finisher = finisher;
diff --git a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
index 7282609..37cf142 100644
--- a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
+++ b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
@@ -45,10 +45,12 @@ public abstract class GeneratedDriverByteToMessageCodec<T extends Message> exten
WriteBuffer buffer = new WriteBuffer(packet.getLengthInBytes());
io.serialize(buffer, packet);
byteBuf.writeBytes(buffer.getData());
+ logger.debug("Sending bytes to PLC for message {} as data {}", packet, Hex.encodeHexString(buffer.getData()));
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
+ logger.trace("Receiving bytes, trying to decode Message...");
// As long as there is data available, continue checking the content.
while(byteBuf.readableBytes() > 0) {
// Check if enough data is present to process the entire package.
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
index 7677342..dfbbcec 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
@@ -72,7 +72,7 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
}
public S7Connection(ChannelFactory channelFactory, String params) {
- super(channelFactory, true);
+ super(channelFactory, true, new S7PlcFieldHandler());
short curRack = 1;
short curSlot = 1;
@@ -167,7 +167,8 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
pipeline.addLast(new S7Protocol());
Plc4xProtocolBase<TPKTPacket> plc4xS7Protocol = new Plc4xS7Protocol(callingTsapId, calledTsapId, tpduSize,
maxAmqCaller, maxAmqCallee, controllerType);
- Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(plc4xS7Protocol, TPKTPacket.class);
+ setProtocol(plc4xS7Protocol);
+ Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(pipeline, plc4xS7Protocol, TPKTPacket.class);
pipeline.addLast(context);
}
};
@@ -184,21 +185,6 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
}
@Override
- public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
- InternalPlcReadRequest internalReadRequest = checkInternal(readRequest, InternalPlcReadRequest.class);
- CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
- PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
- new PlcRequestContainer<>(internalReadRequest, future);
- channel.writeAndFlush(container).addListener(f -> {
- if (!f.isSuccess()) {
- future.completeExceptionally(f.cause());
- }
- });
- return future
- .thenApply(PlcReadResponse.class::cast);
- }
-
- @Override
public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
InternalPlcWriteRequest internalWriteRequest = checkInternal(writeRequest, InternalPlcWriteRequest.class);
CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
index 9c0e3fd..4b604e0 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
@@ -25,7 +25,15 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.s7.readwrite.*;
@@ -50,6 +58,8 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -128,6 +138,40 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
});
}
+ @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+ CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
+ DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
+ List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
+ for (PlcField field : request.getFields()) {
+ requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
+ }
+ final int tpduId = tpduGenerator.getAndIncrement();
+ TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
+ new S7MessageRequest(tpduId,
+ new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
+ new S7PayloadReadVarRequest()),
+ true, (short) tpduId));
+
+ context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+ .handle(p -> {
+ try {
+ future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+ } catch (PlcProtocolException e) {
+ e.printStackTrace();
+ }
+ });
+ return future;
+ }
+
private void extractControllerTypeAndFireConnected(ConversationContext<TPKTPacket> context, S7PayloadUserData payloadUserData) {
for (S7PayloadUserDataItem item : payloadUserData.getItems()) {
if (!(item instanceof S7PayloadUserDataItemCpuFunctionReadSzlResponse)) {
@@ -191,45 +235,7 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
}, null, (short) 0x0000, (short) 0x000F, COTPProtocolClass.CLASS_0);
}
- @Override
- protected void encode(ConversationContext<TPKTPacket> context, PlcRequestContainer msg) throws Exception {
- if (msg.getRequest() instanceof DefaultPlcReadRequest) {
- DefaultPlcReadRequest request = (DefaultPlcReadRequest) msg.getRequest();
- List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
- for (PlcField field : request.getFields()) {
- requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
- }
- final int tpduId = tpduGenerator.getAndIncrement();
- TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
- new S7MessageRequest(tpduId,
- new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
- new S7PayloadReadVarRequest()),
- true, (short) tpduId));
-
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
- // TODO: make it really optional
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
- .handle(p -> {
- try {
- msg.getResponseFuture().complete(decodeReadResponse(p, msg));
- } catch (PlcProtocolException e) {
- e.printStackTrace();
- }
- });
- } else {
- throw new NotImplementedException("Implement me");
- }
- }
-
- private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, PlcRequestContainer requestContainer) throws PlcProtocolException {
- InternalPlcReadRequest plcReadRequest = (InternalPlcReadRequest) requestContainer.getRequest();
-
+ private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, InternalPlcReadRequest plcReadRequest) throws PlcProtocolException {
S7PayloadReadVarResponse payload = (S7PayloadReadVarResponse) responseMessage.getPayload();
// If the numbers of items don't match, we're in big trouble as the only