You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/12/04 11:46:39 UTC

[plc4x] branch develop updated: feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5d4eb0a  feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
5d4eb0a is described below

commit 5d4eb0a7f32e1a959d6020214fb8f79db55e505b
Author: Richard Meister <ri...@gmail.com>
AuthorDate: Sat Dec 4 12:46:31 2021 +0100

    feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
    
    * protocols: ADS - split reserved field in two fields
    
    The AddDeviceNotification message has an 128 bit reserved field and
    sending the message throws a ParseException: "Unsigned Big Integer
    can only contain max 64 bits"
    So the reserved field is split into two 64 bit fields.
    
    * plc4j: driver-ads returns true on canSubscribe()
    
    * plc4j: add subscriptions to driver-ads
    
    Co-authored-by: Sebastian Rühl <sr...@apache.org>
---
 .../org/apache/plc4x/java/ads/ADSPlcDriver.java    |   5 +
 .../plc4x/java/ads/protocol/AdsProtocolLogic.java  | 280 ++++++++++++++++++++-
 .../ads/src/main/resources/protocols/ads/ads.mspec |   3 +-
 3 files changed, 280 insertions(+), 8 deletions(-)

diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
index 7057653..adaf58e 100644
--- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
+++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java
@@ -64,6 +64,11 @@ public class ADSPlcDriver extends GeneratedDriverBase<AmsTCPPacket> {
     }
 
     @Override
+    protected boolean canSubscribe() {
+        return true;
+    }
+
+    @Override
     protected Class<? extends Configuration> getConfigurationType() {
         return AdsConfiguration.class;
     }
diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
index 6a84d05..0c1332c 100644
--- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
+++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
@@ -20,38 +20,43 @@ package org.apache.plc4x.java.ads.protocol;
 
 import org.apache.plc4x.java.ads.configuration.AdsConfiguration;
 import org.apache.plc4x.java.ads.field.*;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
 import org.apache.plc4x.java.ads.readwrite.*;
 import org.apache.plc4x.java.ads.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.spi.configuration.HasConfiguration;
 import org.apache.plc4x.java.spi.generation.*;
-import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
-import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
+import org.apache.plc4x.java.spi.messages.*;
 import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
 import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
 import org.apache.plc4x.java.spi.values.IEC61131ValueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration> {
+public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration>, PlcSubscriber {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AdsProtocolLogic.class);
 
@@ -62,6 +67,8 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
     private final AtomicLong invokeIdGenerator = new AtomicLong(1);
     private RequestTransactionManager tm;
 
+    private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
+
     private final ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
     private final ConcurrentHashMap<SymbolicAdsField, CompletableFuture<Void>> pendingResolutionRequests;
 
@@ -550,6 +557,265 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
         return new DefaultPlcWriteResponse(writeRequest, responseCodes);
     }
 
+    @Override
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        // Get all ADS addresses in their resolved state.
+        final CompletableFuture<List<DirectAdsField>> directAdsFieldsFuture =
+            getDirectAddresses(subscriptionRequest.getFields()
+                .stream()
+                .map(field -> ((DefaultPlcSubscriptionField) field).getPlcField())
+                .collect(Collectors.toList()));
+
+        // If all addresses were already resolved we can send the request immediately.
+        if (directAdsFieldsFuture.isDone()) {
+            final List<DirectAdsField> fields = directAdsFieldsFuture.getNow(null);
+            if (fields != null) {
+                return executeSubscribe(subscriptionRequest);
+            } else {
+                final CompletableFuture<PlcSubscriptionResponse> errorFuture = new CompletableFuture<>();
+                errorFuture.completeExceptionally(new PlcException("Fields are null"));
+                return errorFuture;
+            }
+        }
+        // If there are still symbolic addresses that have to be resolved, send the
+        // request as soon as the resolution is done.
+        // In order to instantly be able to return a future, for the final result we have to
+        // create a new one which is then completed later on. Unfortunately as soon as the
+        // directAdsFieldsFuture is completed we still don't have the end result, but we can
+        // now actually send the delayed read request ... as soon as that future completes
+        // we can complete the initial one.
+        else {
+            CompletableFuture<PlcSubscriptionResponse> delayedSubscribe = new CompletableFuture<>();
+            directAdsFieldsFuture.handle((directAdsFields, throwable) -> {
+                if (directAdsFields != null) {
+                    final CompletableFuture<PlcSubscriptionResponse> delayedResponse =
+                        executeSubscribe(subscriptionRequest);
+                    delayedResponse.handle((plcSubscribeResponse, throwable1) -> {
+                        if (plcSubscribeResponse != null) {
+                            delayedSubscribe.complete(plcSubscribeResponse);
+                        } else {
+                            delayedSubscribe.completeExceptionally(throwable1);
+                        }
+                        return this;
+                    });
+                } else {
+                    delayedSubscribe.completeExceptionally(throwable);
+                }
+                return this;
+            });
+            return delayedSubscribe;
+        }
+    }
+
+    private CompletableFuture<PlcSubscriptionResponse> executeSubscribe(PlcSubscriptionRequest subscribeRequest) {
+        CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
+
+        List<AdsData> adsData = subscribeRequest.getFields().stream()
+            .map(field -> (DefaultPlcSubscriptionField) field)
+            .map(field -> new AdsAddDeviceNotificationRequest(
+                symbolicFieldMapping.get(field.getPlcField()).getIndexGroup(),
+                symbolicFieldMapping.get(field.getPlcField()).getIndexOffset(),
+                (long) ((AdsField) field.getPlcField()).getAdsDataType().getNumBytes() * field.getNumberOfElements(),
+                field.getPlcSubscriptionType() == PlcSubscriptionType.CYCLIC ? 3 : 4, // if it's not cyclic, it's on change or event
+                0 , // there is no api for that yet
+                field.getDuration().orElse(Duration.ZERO).toMillis()))
+            .collect(Collectors.toList());
+
+        List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data ->
+            new AmsTCPPacket(new AmsPacket(
+                configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
+                configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
+            CommandId.ADS_ADD_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());
+
+        Map<String, ResponseItem<PlcSubscriptionHandle>> responses = new HashMap<>();
+
+        // Start the first request-transaction (it is ended in the response-handler).
+        RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        transaction.submit(subscribeRecursively(
+            subscribeRequest,
+            subscribeRequest.getFieldNames().iterator(),
+            responses,
+            future,
+            amsTCPPackets.iterator(),
+            transaction));
+        return future;
+    }
+
+    private Runnable subscribeRecursively(PlcSubscriptionRequest subscriptionRequest, Iterator<String> fieldNames,
+                                          Map<String, ResponseItem<PlcSubscriptionHandle>> responses,
+                                          CompletableFuture<PlcSubscriptionResponse> future,
+                                          Iterator<AmsTCPPacket> amsTCPPackets,
+                                          RequestTransactionManager.RequestTransaction transaction) {
+        return () -> {
+            AmsTCPPacket packet = amsTCPPackets.next();
+            boolean hasMorePackets = amsTCPPackets.hasNext();
+            String fieldName = fieldNames.next();
+            context.sendRequest(packet)
+                .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
+                .unwrap(response -> (AdsAddDeviceNotificationResponse) response.getUserdata().getData())
+                .handle(responseAdsData -> {
+                    if (responseAdsData.getResult() == ReturnCode.OK) {
+                        // Collect notification handle from individual response.
+                        responses.put(fieldName, new ResponseItem<>(
+                            parsePlcResponseCode(responseAdsData.getResult()),
+                            new AdsSubscriptionHandle(this,
+                                fieldName,
+                                ((AdsField) ((DefaultPlcSubscriptionField) subscriptionRequest.getField(fieldName)).getPlcField()).getAdsDataType(),
+                                responseAdsData.getNotificationHandle())));
+
+                        // After receiving the last ADD_DEVICE_NOTIFICATION response, complete the PLC4X response.
+                        if (!hasMorePackets) {
+                            final PlcSubscriptionResponse plcSubscriptionResponse = new DefaultPlcSubscriptionResponse(subscriptionRequest, responses);
+                            future.complete(plcSubscriptionResponse);
+                        }
+                    } else {
+                        if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_INVALIDSIZE) {
+                            future.completeExceptionally(
+                                new PlcException("The parameter size was not correct (Internal error)"));
+                        } else {
+                            future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
+                        }
+                    }
+                    // Finish the request-transaction.
+                    transaction.endRequest();
+
+                    // Submit the next transaction.
+                    if (hasMorePackets) {
+                        RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
+                        nextTransaction.submit(subscribeRecursively(
+                            subscriptionRequest, fieldNames, responses, future, amsTCPPackets, nextTransaction));
+                    }
+                });
+        };
+    }
+
+    @Override
+    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
+
+        List<Long> notificationHandles = new ArrayList<>();
+        unsubscriptionRequest.getSubscriptionHandles().stream()
+            .filter(handle -> handle instanceof AdsSubscriptionHandle)
+            .map(handle -> (AdsSubscriptionHandle) handle)
+            .forEach(adsSubscriptionHandle -> {
+                // Notification handle used for delete notification messages.
+                notificationHandles.add(adsSubscriptionHandle.getNotificationHandle());
+                // Remove consumers
+                consumers.keySet().stream().filter(consumerRegistration ->
+                        consumerRegistration.getSubscriptionHandles().contains(adsSubscriptionHandle))
+                    .forEach(DefaultPlcConsumerRegistration::unregister);
+            });
+
+        List<AdsData> adsData = notificationHandles.stream()
+            .map(AdsDeleteDeviceNotificationRequest::new)
+            .collect(Collectors.toList());
+
+        List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data -> new AmsTCPPacket(
+            new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
+                configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
+                CommandId.ADS_DELETE_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());
+
+        // Start the first request-transaction (it is ended in the response-handler)
+        RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        transaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets.iterator(), transaction));
+        return future;
+    }
+
+    private Runnable unsubscribeRecursively(PlcUnsubscriptionRequest unsubscriptionRequest,
+                                            CompletableFuture<PlcUnsubscriptionResponse> future,
+                                            Iterator<AmsTCPPacket> amsTCPPackets,
+                                            RequestTransactionManager.RequestTransaction transaction){
+        return () -> {
+            AmsTCPPacket packet = amsTCPPackets.next();
+            boolean hasMorePackets = amsTCPPackets.hasNext();
+            context.sendRequest(packet)
+                .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
+                .unwrap(response -> (AdsDeleteDeviceNotificationResponse) response.getUserdata().getData())
+                .handle(responseAdsData -> {
+                    if (responseAdsData.getResult() == ReturnCode.OK) {
+                        // After receiving the last DELETE_DEVICE_NOTIFICATION response, complete the PLC4X response.
+                        if (!hasMorePackets) {
+                            final PlcUnsubscriptionResponse plcUnsubscriptionResponse = new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest);
+                            future.complete(plcUnsubscriptionResponse);
+                        }
+                    } else {
+                        // TODO: this is more guesswork than knowing it could actually occur
+                        if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_NOTIFYHNDINVALID) {
+                            future.completeExceptionally(
+                                new PlcException("The notification handle is invalid (Internal error)"));
+                        } else {
+                            future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
+                        }
+                    }
+                    // Finish the request-transaction.
+                    transaction.endRequest();
+
+                    // Submit the next transaction.
+                    if (hasMorePackets) {
+                        RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
+                        nextTransaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets, nextTransaction));
+                    }
+                });
+        };
+    }
+
+    @Override
+    protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket msg) throws Exception {
+        if (msg.getUserdata().getData() instanceof AdsDeviceNotificationRequest){
+            AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata().getData();
+            AdsStampHeader[] stamps = notificationData.getAdsStampHeaders();
+            for (int stamp=0; stamp < notificationData.getStamps(); stamp++){
+                // convert Windows FILETIME format to unix epoch
+                long unixEpochTimestamp = stamps[stamp].getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
+                AdsNotificationSample[] samples = stamps[stamp].getAdsNotificationSamples();
+                for (int smpl=0; smpl < stamps[stamp].getSamples(); smpl++){
+                    long handle = samples[smpl].getNotificationHandle();
+                    final AdsNotificationSample sample = samples[smpl];
+                    for (DefaultPlcConsumerRegistration registration : consumers.keySet()){
+                        for(PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()){
+                            if (subscriptionHandle instanceof AdsSubscriptionHandle) {
+                                AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
+                                if (adsHandle.getNotificationHandle() == handle)
+                                    consumers.get(registration).accept(
+                                        new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
+                                            convertSampleToPlc4XResult(adsHandle, sample.getData())));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
+        ParseException {
+        Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
+        ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, true);
+        values.put(subscriptionHandle.getPlcFieldName(), new ResponseItem<>(PlcResponseCode.OK,
+            DataItemIO.staticParse(readBuffer, subscriptionHandle.getAdsDataType().getDataFormatName(), data.length)));
+        return values;
+    }
+
+    @Override
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        final DefaultPlcConsumerRegistration consumerRegistration =
+            new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
+        consumers.put(consumerRegistration, consumer);
+        return consumerRegistration;
+    }
+
+    @Override
+    public void unregister(PlcConsumerRegistration registration) {
+        DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) registration;
+        consumers.remove(consumerRegistration);
+    }
+
     protected CompletableFuture<List<DirectAdsField>> getDirectAddresses(List<PlcField> fields) {
         CompletableFuture<List<DirectAdsField>> future = new CompletableFuture<>();
 
diff --git a/protocols/ads/src/main/resources/protocols/ads/ads.mspec b/protocols/ads/src/main/resources/protocols/ads/ads.mspec
index 4541f7a..5870c07 100644
--- a/protocols/ads/src/main/resources/protocols/ads/ads.mspec
+++ b/protocols/ads/src/main/resources/protocols/ads/ads.mspec
@@ -281,7 +281,8 @@
             // 4 bytes	The ADS server checks if the value changes in this time slice. The unit is 1ms
             [simple uint 32 cycleTime]
             // 16bytes	Must be set to 0
-            [reserved   uint       128       '0x0000' ]
+            [reserved   uint       64       '0x0000' ]
+            [reserved   uint       64       '0x0000' ]
         ]
         ['ADS_ADD_DEVICE_NOTIFICATION', 'true' AdsAddDeviceNotificationResponse
             // 4 bytes	ADS error number