You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/12/04 11:46:39 UTC
[plc4x] branch develop updated: feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 5d4eb0a feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
5d4eb0a is described below
commit 5d4eb0a7f32e1a959d6020214fb8f79db55e505b
Author: Richard Meister <ri...@gmail.com>
AuthorDate: Sat Dec 4 12:46:31 2021 +0100
feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
* protocols: ADS - split reserved field in two fields
The AddDeviceNotification message has an 128 bit reserved field and
sending the message throws a ParseException: "Unsigned Big Integer
can only contain max 64 bits"
So the reserved field is split into two 64 bit fields.
* plc4j: driver-ads returns true on canSubscribe()
* plc4j: add subscriptions to driver-ads
Co-authored-by: Sebastian Rühl <sr...@apache.org>
---
.../org/apache/plc4x/java/ads/ADSPlcDriver.java | 5 +
.../plc4x/java/ads/protocol/AdsProtocolLogic.java | 280 ++++++++++++++++++++-
.../ads/src/main/resources/protocols/ads/ads.mspec | 3 +-
3 files changed, 280 insertions(+), 8 deletions(-)
diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
index 7057653..adaf58e 100644
--- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
+++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
@@ -64,6 +64,11 @@ public class ADSPlcDriver extends GeneratedDriverBase<AmsTCPPacket> {
}
@Override
+ protected boolean canSubscribe() {
+ return true;
+ }
+
+ @Override
protected Class<? extends Configuration> getConfigurationType() {
return AdsConfiguration.class;
}
diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
index 6a84d05..0c1332c 100644
--- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
+++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
@@ -20,38 +20,43 @@ package org.apache.plc4x.java.ads.protocol;
import org.apache.plc4x.java.ads.configuration.AdsConfiguration;
import org.apache.plc4x.java.ads.field.*;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
import org.apache.plc4x.java.ads.readwrite.*;
import org.apache.plc4x.java.ads.readwrite.io.DataItemIO;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.generation.*;
-import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
-import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
+import org.apache.plc4x.java.spi.messages.*;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.values.IEC61131ValueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration> {
+public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration>, PlcSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(AdsProtocolLogic.class);
@@ -62,6 +67,8 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
private final AtomicLong invokeIdGenerator = new AtomicLong(1);
private RequestTransactionManager tm;
+ private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
+
private final ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
private final ConcurrentHashMap<SymbolicAdsField, CompletableFuture<Void>> pendingResolutionRequests;
@@ -550,6 +557,265 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
return new DefaultPlcWriteResponse(writeRequest, responseCodes);
}
+ @Override
+ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+ // Get all ADS addresses in their resolved state.
+ final CompletableFuture<List<DirectAdsField>> directAdsFieldsFuture =
+ getDirectAddresses(subscriptionRequest.getFields()
+ .stream()
+ .map(field -> ((DefaultPlcSubscriptionField) field).getPlcField())
+ .collect(Collectors.toList()));
+
+ // If all addresses were already resolved we can send the request immediately.
+ if (directAdsFieldsFuture.isDone()) {
+ final List<DirectAdsField> fields = directAdsFieldsFuture.getNow(null);
+ if (fields != null) {
+ return executeSubscribe(subscriptionRequest);
+ } else {
+ final CompletableFuture<PlcSubscriptionResponse> errorFuture = new CompletableFuture<>();
+ errorFuture.completeExceptionally(new PlcException("Fields are null"));
+ return errorFuture;
+ }
+ }
+ // If there are still symbolic addresses that have to be resolved, send the
+ // request as soon as the resolution is done.
+ // In order to instantly be able to return a future, for the final result we have to
+ // create a new one which is then completed later on. Unfortunately as soon as the
+ // directAdsFieldsFuture is completed we still don't have the end result, but we can
+ // now actually send the delayed read request ... as soon as that future completes
+ // we can complete the initial one.
+ else {
+ CompletableFuture<PlcSubscriptionResponse> delayedSubscribe = new CompletableFuture<>();
+ directAdsFieldsFuture.handle((directAdsFields, throwable) -> {
+ if (directAdsFields != null) {
+ final CompletableFuture<PlcSubscriptionResponse> delayedResponse =
+ executeSubscribe(subscriptionRequest);
+ delayedResponse.handle((plcSubscribeResponse, throwable1) -> {
+ if (plcSubscribeResponse != null) {
+ delayedSubscribe.complete(plcSubscribeResponse);
+ } else {
+ delayedSubscribe.completeExceptionally(throwable1);
+ }
+ return this;
+ });
+ } else {
+ delayedSubscribe.completeExceptionally(throwable);
+ }
+ return this;
+ });
+ return delayedSubscribe;
+ }
+ }
+
+ private CompletableFuture<PlcSubscriptionResponse> executeSubscribe(PlcSubscriptionRequest subscribeRequest) {
+ CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
+
+ List<AdsData> adsData = subscribeRequest.getFields().stream()
+ .map(field -> (DefaultPlcSubscriptionField) field)
+ .map(field -> new AdsAddDeviceNotificationRequest(
+ symbolicFieldMapping.get(field.getPlcField()).getIndexGroup(),
+ symbolicFieldMapping.get(field.getPlcField()).getIndexOffset(),
+ (long) ((AdsField) field.getPlcField()).getAdsDataType().getNumBytes() * field.getNumberOfElements(),
+ field.getPlcSubscriptionType() == PlcSubscriptionType.CYCLIC ? 3 : 4, // if it's not cyclic, it's on change or event
+ 0 , // there is no api for that yet
+ field.getDuration().orElse(Duration.ZERO).toMillis()))
+ .collect(Collectors.toList());
+
+ List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data ->
+ new AmsTCPPacket(new AmsPacket(
+ configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
+ configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
+ CommandId.ADS_ADD_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());
+
+ Map<String, ResponseItem<PlcSubscriptionHandle>> responses = new HashMap<>();
+
+ // Start the first request-transaction (it is ended in the response-handler).
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ transaction.submit(subscribeRecursively(
+ subscribeRequest,
+ subscribeRequest.getFieldNames().iterator(),
+ responses,
+ future,
+ amsTCPPackets.iterator(),
+ transaction));
+ return future;
+ }
+
+ private Runnable subscribeRecursively(PlcSubscriptionRequest subscriptionRequest, Iterator<String> fieldNames,
+ Map<String, ResponseItem<PlcSubscriptionHandle>> responses,
+ CompletableFuture<PlcSubscriptionResponse> future,
+ Iterator<AmsTCPPacket> amsTCPPackets,
+ RequestTransactionManager.RequestTransaction transaction) {
+ return () -> {
+ AmsTCPPacket packet = amsTCPPackets.next();
+ boolean hasMorePackets = amsTCPPackets.hasNext();
+ String fieldName = fieldNames.next();
+ context.sendRequest(packet)
+ .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
+ .unwrap(response -> (AdsAddDeviceNotificationResponse) response.getUserdata().getData())
+ .handle(responseAdsData -> {
+ if (responseAdsData.getResult() == ReturnCode.OK) {
+ // Collect notification handle from individual response.
+ responses.put(fieldName, new ResponseItem<>(
+ parsePlcResponseCode(responseAdsData.getResult()),
+ new AdsSubscriptionHandle(this,
+ fieldName,
+ ((AdsField) ((DefaultPlcSubscriptionField) subscriptionRequest.getField(fieldName)).getPlcField()).getAdsDataType(),
+ responseAdsData.getNotificationHandle())));
+
+ // After receiving the last ADD_DEVICE_NOTIFICATION response, complete the PLC4X response.
+ if (!hasMorePackets) {
+ final PlcSubscriptionResponse plcSubscriptionResponse = new DefaultPlcSubscriptionResponse(subscriptionRequest, responses);
+ future.complete(plcSubscriptionResponse);
+ }
+ } else {
+ if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_INVALIDSIZE) {
+ future.completeExceptionally(
+ new PlcException("The parameter size was not correct (Internal error)"));
+ } else {
+ future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
+ }
+ }
+ // Finish the request-transaction.
+ transaction.endRequest();
+
+ // Submit the next transaction.
+ if (hasMorePackets) {
+ RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
+ nextTransaction.submit(subscribeRecursively(
+ subscriptionRequest, fieldNames, responses, future, amsTCPPackets, nextTransaction));
+ }
+ });
+ };
+ }
+
+ @Override
+ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+ CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
+
+ List<Long> notificationHandles = new ArrayList<>();
+ unsubscriptionRequest.getSubscriptionHandles().stream()
+ .filter(handle -> handle instanceof AdsSubscriptionHandle)
+ .map(handle -> (AdsSubscriptionHandle) handle)
+ .forEach(adsSubscriptionHandle -> {
+ // Notification handle used for delete notification messages.
+ notificationHandles.add(adsSubscriptionHandle.getNotificationHandle());
+ // Remove consumers
+ consumers.keySet().stream().filter(consumerRegistration ->
+ consumerRegistration.getSubscriptionHandles().contains(adsSubscriptionHandle))
+ .forEach(DefaultPlcConsumerRegistration::unregister);
+ });
+
+ List<AdsData> adsData = notificationHandles.stream()
+ .map(AdsDeleteDeviceNotificationRequest::new)
+ .collect(Collectors.toList());
+
+ List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data -> new AmsTCPPacket(
+ new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
+ configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
+ CommandId.ADS_DELETE_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());
+
+ // Start the first request-transaction (it is ended in the response-handler)
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ transaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets.iterator(), transaction));
+ return future;
+ }
+
+ private Runnable unsubscribeRecursively(PlcUnsubscriptionRequest unsubscriptionRequest,
+ CompletableFuture<PlcUnsubscriptionResponse> future,
+ Iterator<AmsTCPPacket> amsTCPPackets,
+ RequestTransactionManager.RequestTransaction transaction){
+ return () -> {
+ AmsTCPPacket packet = amsTCPPackets.next();
+ boolean hasMorePackets = amsTCPPackets.hasNext();
+ context.sendRequest(packet)
+ .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
+ .unwrap(response -> (AdsDeleteDeviceNotificationResponse) response.getUserdata().getData())
+ .handle(responseAdsData -> {
+ if (responseAdsData.getResult() == ReturnCode.OK) {
+ // After receiving the last DELETE_DEVICE_NOTIFICATION response, complete the PLC4X response.
+ if (!hasMorePackets) {
+ final PlcUnsubscriptionResponse plcUnsubscriptionResponse = new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest);
+ future.complete(plcUnsubscriptionResponse);
+ }
+ } else {
+ // TODO: this is more guesswork than knowing it could actually occur
+ if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_NOTIFYHNDINVALID) {
+ future.completeExceptionally(
+ new PlcException("The notification handle is invalid (Internal error)"));
+ } else {
+ future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
+ }
+ }
+ // Finish the request-transaction.
+ transaction.endRequest();
+
+ // Submit the next transaction.
+ if (hasMorePackets) {
+ RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
+ nextTransaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets, nextTransaction));
+ }
+ });
+ };
+ }
+
+ @Override
+ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket msg) throws Exception {
+ if (msg.getUserdata().getData() instanceof AdsDeviceNotificationRequest){
+ AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata().getData();
+ AdsStampHeader[] stamps = notificationData.getAdsStampHeaders();
+ for (int stamp=0; stamp < notificationData.getStamps(); stamp++){
+ // convert Windows FILETIME format to unix epoch
+ long unixEpochTimestamp = stamps[stamp].getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
+ AdsNotificationSample[] samples = stamps[stamp].getAdsNotificationSamples();
+ for (int smpl=0; smpl < stamps[stamp].getSamples(); smpl++){
+ long handle = samples[smpl].getNotificationHandle();
+ final AdsNotificationSample sample = samples[smpl];
+ for (DefaultPlcConsumerRegistration registration : consumers.keySet()){
+ for(PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()){
+ if (subscriptionHandle instanceof AdsSubscriptionHandle) {
+ AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
+ if (adsHandle.getNotificationHandle() == handle)
+ consumers.get(registration).accept(
+ new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
+ convertSampleToPlc4XResult(adsHandle, sample.getData())));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
+ ParseException {
+ Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
+ ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, true);
+ values.put(subscriptionHandle.getPlcFieldName(), new ResponseItem<>(PlcResponseCode.OK,
+ DataItemIO.staticParse(readBuffer, subscriptionHandle.getAdsDataType().getDataFormatName(), data.length)));
+ return values;
+ }
+
+ @Override
+ public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+ final DefaultPlcConsumerRegistration consumerRegistration =
+ new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
+ consumers.put(consumerRegistration, consumer);
+ return consumerRegistration;
+ }
+
+ @Override
+ public void unregister(PlcConsumerRegistration registration) {
+ DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) registration;
+ consumers.remove(consumerRegistration);
+ }
+
protected CompletableFuture<List<DirectAdsField>> getDirectAddresses(List<PlcField> fields) {
CompletableFuture<List<DirectAdsField>> future = new CompletableFuture<>();
diff --git a/protocols/ads/src/main/resources/protocols/ads/ads.mspec b/protocols/ads/src/main/resources/protocols/ads/ads.mspec
index 4541f7a..5870c07 100644
--- a/protocols/ads/src/main/resources/protocols/ads/ads.mspec
+++ b/protocols/ads/src/main/resources/protocols/ads/ads.mspec
@@ -281,7 +281,8 @@
// 4 bytes The ADS server checks if the value changes in this time slice. The unit is 1ms
[simple uint 32 cycleTime]
// 16bytes Must be set to 0
- [reserved uint 128 '0x0000' ]
+ [reserved uint 64 '0x0000' ]
+ [reserved uint 64 '0x0000' ]
]
['ADS_ADD_DEVICE_NOTIFICATION', 'true' AdsAddDeviceNotificationResponse
// 4 bytes ADS error number