You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/01 12:40:12 UTC

[plc4x] branch develop updated (8334e20f4 -> 8b8518cd7)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 8334e20f4 fix(plc4go/cbus): fix filtering
     new 119bcf848 doc(plc4): refined java doc regarding subscription
     new 8b8518cd7 feat(plc4j): added addPreRegisteredConsumer as convenience method for pre register consumers

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/messages/PlcSubscriptionFieldRequest.java  |  4 ++
 .../java/api/messages/PlcSubscriptionRequest.java  | 15 ++++++
 .../java/api/messages/PlcSubscriptionResponse.java |  9 ++++
 .../api/messages/PlcUnsubscriptionRequest.java     | 16 ++++--
 .../plc4x/java/ads/protocol/AdsProtocolLogic.java  | 62 +++++++++++-----------
 .../bacnetip/protocol/BacNetIpProtocolLogic.java   |  5 +-
 .../generic/protocol/GenericCANProtocolLogic.java  |  1 +
 .../canopen/protocol/CANOpenProtocolLogic.java     |  4 +-
 .../plc4x/java/mock/connection/MockConnection.java | 18 +++----
 .../java/org/apache/plc4x/camel/MockDriver.java    |  4 +-
 .../messages/DefaultPlcSubscriptionRequest.java    | 36 ++++++++++---
 .../messages/DefaultPlcSubscriptionResponse.java   |  9 +++-
 12 files changed, 125 insertions(+), 58 deletions(-)


[plc4x] 02/02: feat(plc4j): added addPreRegisteredConsumer as convenience method for pre register consumers

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8b8518cd7b4d25e1fb3794d0af0005d0c850ea21
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 1 14:40:03 2022 +0200

    feat(plc4j): added addPreRegisteredConsumer as convenience method for pre register consumers
---
 .../api/messages/PlcSubscriptionFieldRequest.java  |  4 ++
 .../java/api/messages/PlcSubscriptionRequest.java  | 15 ++++++
 .../plc4x/java/ads/protocol/AdsProtocolLogic.java  | 62 +++++++++++-----------
 .../bacnetip/protocol/BacNetIpProtocolLogic.java   |  5 +-
 .../generic/protocol/GenericCANProtocolLogic.java  |  1 +
 .../canopen/protocol/CANOpenProtocolLogic.java     |  4 +-
 .../plc4x/java/mock/connection/MockConnection.java | 18 +++----
 .../java/org/apache/plc4x/camel/MockDriver.java    |  4 +-
 .../messages/DefaultPlcSubscriptionRequest.java    | 36 ++++++++++---
 .../messages/DefaultPlcSubscriptionResponse.java   |  9 +++-
 10 files changed, 103 insertions(+), 55 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionFieldRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionFieldRequest.java
index 648f2e686..1d9483e2d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionFieldRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionFieldRequest.java
@@ -22,7 +22,9 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionField;
 
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 public interface PlcSubscriptionFieldRequest extends PlcRequest {
 
@@ -37,4 +39,6 @@ public interface PlcSubscriptionFieldRequest extends PlcRequest {
 
     List<PlcSubscriptionField> getFields();
 
+    Map<String, List<Consumer<PlcSubscriptionEvent>>> getPreRegisteredConsumers();
+
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 955a3de78..e06291046 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.plc4x.java.api.messages;
 
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 public interface PlcSubscriptionRequest extends PlcSubscriptionFieldRequest {
 
@@ -63,6 +66,18 @@ public interface PlcSubscriptionRequest extends PlcSubscriptionFieldRequest {
          */
         PlcSubscriptionRequest.Builder addEventField(String name, String fieldQuery);
 
+        /**
+         * Convenience method which attaches the {@link Consumer<PlcSubscriptionEvent>} directly to the handles once the
+         * requests succeeds.
+         * Note: opposed to register on the {@link org.apache.plc4x.java.api.model.PlcSubscriptionHandle} directly you
+         * won't retrieve a {@link PlcConsumerRegistration} which is useful to cancel registrations.
+         *
+         * @param name     alias of the field.
+         * @param preRegisteredConsumer {@link Consumer<PlcSubscriptionEvent>} to be attached
+         * @return builder.
+         */
+        PlcSubscriptionRequest.Builder addPreRegisteredConsumer(String name, Consumer<PlcSubscriptionEvent> preRegisteredConsumer);
+
     }
 
 }
diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
index abe27b7f0..8a8bf8989 100644
--- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
+++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java
@@ -74,7 +74,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
 
     private final Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
 
-//    private final ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
+    //    private final ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
     private final ConcurrentHashMap<SymbolicAdsField, CompletableFuture<Void>> pendingResolutionRequests;
 
     private final Map<String, AdsSymbolTableEntry> symbolTable;
@@ -110,8 +110,8 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
 
         // If we have connection credentials available, try to set up the AMS routes.
         CompletableFuture<Void> setupAmsRouteFuture;
-        if(context.getAuthentication() != null) {
-            if(!(context.getAuthentication() instanceof PlcUsernamePasswordAuthentication)) {
+        if (context.getAuthentication() != null) {
+            if (!(context.getAuthentication() instanceof PlcUsernamePasswordAuthentication)) {
                 future.completeExceptionally(new PlcConnectionException(
                     "This type of connection only supports username-password authentication"));
                 return;
@@ -127,7 +127,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
         // If the configuration asks us to load the symbol and data type tables, do so,
         // otherwise just mark the connection as completed instantly.
         setupAmsRouteFuture.whenComplete((unused, throwable) -> {
-            if(configuration.isLoadSymbolAndDataTypeTables()) {
+            if (configuration.isLoadSymbolAndDataTypeTables()) {
                 LOGGER.debug("Fetching sizes of symbol and datatype table sizes.");
                 CompletableFuture<Void> readSymbolTableFuture = readSymbolTableAndDatatypeTable(context);
                 readSymbolTableFuture.whenComplete((unused2, throwable2) -> {
@@ -319,7 +319,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
             // Add the type itself.
             values.add(new DefaultPlcBrowseItem(symbol.getName(), symbol.getDataTypeName()));
             AdsDataTypeTableEntry dataType = dataTypeTable.get(symbol.getDataTypeName());
-            if(dataType == null) {
+            if (dataType == null) {
                 System.out.printf("couldn't find datatype: %s%n", symbol.getDataTypeName());
                 continue;
             }
@@ -332,7 +332,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
     }
 
     protected List<PlcBrowseItem> getBrowseItems(String basePath, long baseGroupId, long baseOffset, AdsDataTypeTableEntry dataType) {
-        if(dataType.getNumChildren() == 0) {
+        if (dataType.getNumChildren() == 0) {
             return Collections.emptyList();
         }
 
@@ -340,7 +340,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
         for (AdsDataTypeTableChildEntry child : dataType.getChildren()) {
             values.add(new DefaultPlcBrowseItem(basePath + "." + child.getPropertyName(), child.getDataTypeName()));
             AdsDataTypeTableEntry childDataType = dataTypeTable.get(child.getDataTypeName());
-            if(childDataType == null) {
+            if (childDataType == null) {
                 System.out.printf("couldn't find datatype: %s%n", child.getDataTypeName());
                 continue;
             }
@@ -418,7 +418,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
 
         AmsPacket amsPacket = new AdsReadRequest(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
             configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(), 0, getInvokeId(),
-            directAdsField.getIndexGroup(), directAdsField.getIndexOffset(),size * directAdsField.getNumberOfElements());
+            directAdsField.getIndexGroup(), directAdsField.getIndexOffset(), size * directAdsField.getNumberOfElements());
         AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
 
         // Start a new request-transaction (Is ended in the response-handler)
@@ -463,13 +463,13 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
             configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
             0, getInvokeId(), ReservedIndexGroups.ADSIGRP_MULTIPLE_READ.getValue(), directAdsFields.size(),
             expectedResponseDataSize, directAdsFields.stream().map(directAdsField -> {
-                String dataTypeName = directAdsField.getAdsDataTypeName();
-                AdsDataTypeTableEntry adsDataTypeTableEntry = dataTypeTable.get(dataTypeName);
-                long size = adsDataTypeTableEntry.getSize();
-                return new AdsMultiRequestItemRead(
-                        directAdsField.getIndexGroup(), directAdsField.getIndexOffset(),
-                        (size * directAdsField.getNumberOfElements()));
-            }).collect(Collectors.toList()), null);
+            String dataTypeName = directAdsField.getAdsDataTypeName();
+            AdsDataTypeTableEntry adsDataTypeTableEntry = dataTypeTable.get(dataTypeName);
+            long size = adsDataTypeTableEntry.getSize();
+            return new AdsMultiRequestItemRead(
+                directAdsField.getIndexGroup(), directAdsField.getIndexOffset(),
+                (size * directAdsField.getNumberOfElements()));
+        }).collect(Collectors.toList()), null);
         AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
 
         // Start a new request-transaction (Is ended in the response-handler)
@@ -525,7 +525,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
             Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
             for (String fieldName : readRequest.getFieldNames()) {
                 DirectAdsField field;
-                if(readRequest.getField(fieldName) instanceof DirectAdsField) {
+                if (readRequest.getField(fieldName) instanceof DirectAdsField) {
                     field = (DirectAdsField) readRequest.getField(fieldName);
                 } else {
                     SymbolicAdsField symbolicAdsField = (SymbolicAdsField) readRequest.getField(fieldName);
@@ -592,9 +592,9 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
                 int startPos = readBuffer.getPos();
                 int curPos = 0;
                 for (AdsDataTypeTableChildEntry child : adsDataTypeTableEntry.getChildren()) {
-                    if(child.getOffset() > curPos) {
+                    if (child.getOffset() > curPos) {
                         long skipBytes = child.getOffset() - curPos;
-                        for(long i = 0; i < skipBytes; i++) {
+                        for (long i = 0; i < skipBytes; i++) {
                             readBuffer.readByte();
                         }
                     }
@@ -615,13 +615,13 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
 
     private PlcValue parseArrayLevel(AdsDataTypeTableEntry adsDataTypeTableEntry, List<AdsDataTypeArrayInfo> arrayLayers, ReadBuffer readBuffer) throws ParseException {
         // If this is the last layer of the Array, parse the values themselves.
-        if(arrayLayers.isEmpty()) {
+        if (arrayLayers.isEmpty()) {
             String dataTypeName = adsDataTypeTableEntry.getDataTypeName();
             dataTypeName = dataTypeName.substring(dataTypeName.lastIndexOf(" OF ") + 4);
             int stringLength = 0;
-            if(dataTypeName.startsWith("STRING(")) {
+            if (dataTypeName.startsWith("STRING(")) {
                 stringLength = Integer.parseInt(dataTypeName.substring(7, dataTypeName.length() - 1));
-            } else if(dataTypeName.startsWith("WSTRING(")) {
+            } else if (dataTypeName.startsWith("WSTRING(")) {
                 stringLength = Integer.parseInt(dataTypeName.substring(8, dataTypeName.length() - 1));
             }
             AdsDataTypeTableEntry elementDataTypeTableEntry = dataTypeTable.get(dataTypeName);
@@ -632,7 +632,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
         List<PlcValue> elements = new ArrayList<>();
         List<AdsDataTypeArrayInfo> arrayInfo = adsDataTypeTableEntry.getArrayInfo();
         AdsDataTypeArrayInfo firstLayer = arrayInfo.get(0);
-        for(int i = 0; i < firstLayer.getNumElements(); i++) {
+        for (int i = 0; i < firstLayer.getNumElements(); i++) {
             List<AdsDataTypeArrayInfo> remainingLayers = arrayInfo.subList(1, arrayInfo.size());
             elements.add(parseArrayLevel(adsDataTypeTableEntry, remainingLayers, readBuffer));
         }
@@ -1299,7 +1299,7 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
     }
 
     protected DirectAdsField getDirectAdsFieldForSymbolicName(PlcField field) {
-        if(field instanceof DirectAdsField) {
+        if (field instanceof DirectAdsField) {
             return (DirectAdsField) field;
         }
 
@@ -1308,9 +1308,9 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
         String[] addressParts = symbolicAddress.split("\\.");
 
         // If the number of parts are less than 2, we can find the entry in the symbol table directly.
-        if(addressParts.length < 2) {
+        if (addressParts.length < 2) {
             // We can't find it, so we need to resolve it.
-            if(!symbolTable.containsKey(symbolicAddress)) {
+            if (!symbolTable.containsKey(symbolicAddress)) {
                 return null;
             }
 
@@ -1331,14 +1331,14 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
     }
 
     protected DirectAdsField resolveDirectAdsFieldForSymbolicNameFromDataType(List<String> remainingAddressParts, long currentGroup, long currentOffset, AdsDataTypeTableEntry adsDataTypeTableEntry) {
-        if(remainingAddressParts.isEmpty()) {
+        if (remainingAddressParts.isEmpty()) {
             // TODO: Implement the Array support
             return new DirectAdsField(currentGroup, currentOffset, adsDataTypeTableEntry.getDataTypeName(), null);
         }
 
         // Go through all children looking for a matching one.
         for (AdsDataTypeTableChildEntry child : adsDataTypeTableEntry.getChildren()) {
-            if(child.getPropertyName().equals(remainingAddressParts.get(0))) {
+            if (child.getPropertyName().equals(remainingAddressParts.get(0))) {
                 AdsDataTypeTableEntry childAdsDataTypeTableEntry = dataTypeTable.get(child.getDataTypeName());
                 return resolveDirectAdsFieldForSymbolicNameFromDataType(
                     remainingAddressParts.subList(1, remainingAddressParts.size()),
@@ -1352,9 +1352,9 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
 
     protected PlcValueType getPlcValueTypeForAdsDataType(AdsDataTypeTableEntry dataTypeTableEntry) {
         String dataTypeName = dataTypeTableEntry.getDataTypeName();
-        if(dataTypeName.startsWith("STRING(")) {
+        if (dataTypeName.startsWith("STRING(")) {
             dataTypeName = "STRING";
-        } else if(dataTypeName.startsWith("WSTRING(")) {
+        } else if (dataTypeName.startsWith("WSTRING(")) {
             dataTypeName = "WSTRING";
         }
         // First check, if this is a primitive type.
@@ -1362,11 +1362,11 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
             return PlcValueType.valueOf(dataTypeName);
         } catch (IllegalArgumentException e) {
             // Then check if this is an array.
-            if(dataTypeTableEntry.getArrayDimensions() > 0) {
+            if (dataTypeTableEntry.getArrayDimensions() > 0) {
                 return PlcValueType.List;
             }
             return PlcValueType.Struct;
-       }
+        }
     }
 
     protected byte[] getNullByteTerminatedArray(String value) {
diff --git a/plc4j/drivers/bacnet/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java b/plc4j/drivers/bacnet/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java
index 60fb8dc70..0cdc6037d 100644
--- a/plc4j/drivers/bacnet/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java
+++ b/plc4j/drivers/bacnet/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java
@@ -203,7 +203,7 @@ public class BacNetIpProtocolLogic extends Plc4xProtocolBase<BVLC> implements Ha
             for (BACnetPropertyValue baCnetPropertyValue : valueChange.getListOfValues().getData()) {
                 // These are value change notifications. Ignore the rest.
                 if (baCnetPropertyValue.getPropertyIdentifier().getValue() == BACnetPropertyIdentifier.PRESENT_VALUE) {
-                    BACnetApplicationTag baCnetTag = ((BACnetConstructedDataUnspecified)baCnetPropertyValue.getPropertyValue().getConstructedData()).getData().get(0).getApplicationTag();
+                    BACnetApplicationTag baCnetTag = ((BACnetConstructedDataUnspecified) baCnetPropertyValue.getPropertyValue().getConstructedData()).getData().get(0).getApplicationTag();
 
                     // Initialize an enriched version of the PlcStruct.
                     final Map<String, PlcValue> enrichedPlcValue = new HashMap<>();
@@ -247,8 +247,7 @@ public class BacNetIpProtocolLogic extends Plc4xProtocolBase<BVLC> implements Ha
         for (String fieldName : subscriptionRequest.getFieldNames()) {
             values.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
         }
-        return CompletableFuture.completedFuture(
-            new DefaultPlcSubscriptionResponse(subscriptionRequest, values));
+        return CompletableFuture.completedFuture(new DefaultPlcSubscriptionResponse(subscriptionRequest, values));
     }
 
     @Override
diff --git a/plc4j/drivers/can/src/main/java/org/apache/plc4x/java/can/generic/protocol/GenericCANProtocolLogic.java b/plc4j/drivers/can/src/main/java/org/apache/plc4x/java/can/generic/protocol/GenericCANProtocolLogic.java
index 93e9f81f1..dc3473914 100644
--- a/plc4j/drivers/can/src/main/java/org/apache/plc4x/java/can/generic/protocol/GenericCANProtocolLogic.java
+++ b/plc4j/drivers/can/src/main/java/org/apache/plc4x/java/can/generic/protocol/GenericCANProtocolLogic.java
@@ -19,6 +19,7 @@
 package org.apache.plc4x.java.can.generic.protocol;
 
 import java.util.Map.Entry;
+
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcField;
diff --git a/plc4j/drivers/canopen/src/main/java/org/apache/plc4x/java/canopen/protocol/CANOpenProtocolLogic.java b/plc4j/drivers/canopen/src/main/java/org/apache/plc4x/java/canopen/protocol/CANOpenProtocolLogic.java
index 1cce27c84..cb2c9bdca 100644
--- a/plc4j/drivers/canopen/src/main/java/org/apache/plc4x/java/canopen/protocol/CANOpenProtocolLogic.java
+++ b/plc4j/drivers/canopen/src/main/java/org/apache/plc4x/java/canopen/protocol/CANOpenProtocolLogic.java
@@ -246,7 +246,7 @@ public class CANOpenProtocolLogic extends Plc4xCANProtocolBase<CANOpenFrame>
         if (!(field instanceof CANOpenSDOField)) {
             response.completeExceptionally(new IllegalArgumentException("Only CANOpenSDOField instances are supported"));
             return response;
-        };
+        }
 
         readInternally(readRequest, (CANOpenSDOField) field, response);
         return response;
@@ -436,7 +436,7 @@ public class CANOpenProtocolLogic extends Plc4xCANProtocolBase<CANOpenFrame>
 
     @Override
     public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
-        final DefaultPlcConsumerRegistration consumerRegistration =new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new DefaultPlcSubscriptionHandle[0]));
+        final DefaultPlcConsumerRegistration consumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new DefaultPlcSubscriptionHandle[0]));
         consumers.put(consumerRegistration, consumer);
         return consumerRegistration;
     }
diff --git a/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/connection/MockConnection.java b/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/connection/MockConnection.java
index 1704c3ac6..e69326464 100644
--- a/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/connection/MockConnection.java
+++ b/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/connection/MockConnection.java
@@ -137,8 +137,8 @@ public class MockConnection implements PlcConnection, PlcReader, PlcWriter, PlcS
             LOGGER.debug("Sending read request to MockDevice");
             Map<String, ResponseItem<PlcValue>> response = readRequest.getFieldNames().stream()
                 .collect(Collectors.toMap(
-                    Function.identity(),
-                    name -> device.read(((MockField) readRequest.getField(name)).getAddress())
+                        Function.identity(),
+                        name -> device.read(((MockField) readRequest.getField(name)).getAddress())
                     )
                 );
             return new DefaultPlcReadResponse((DefaultPlcReadRequest) readRequest, response);
@@ -152,9 +152,9 @@ public class MockConnection implements PlcConnection, PlcReader, PlcWriter, PlcS
             LOGGER.debug("Sending write request to MockDevice");
             Map<String, PlcResponseCode> response = writeRequest.getFieldNames().stream()
                 .collect(Collectors.toMap(
-                    Function.identity(),
-                    name -> device.write(((MockField) writeRequest.getField(name)).getAddress(),
-                        ((MockField) writeRequest.getField(name)).getPlcValue())
+                        Function.identity(),
+                        name -> device.write(((MockField) writeRequest.getField(name)).getAddress(),
+                            ((MockField) writeRequest.getField(name)).getPlcValue())
                     )
                 );
             return new DefaultPlcWriteResponse((DefaultPlcWriteRequest) writeRequest, response);
@@ -168,11 +168,11 @@ public class MockConnection implements PlcConnection, PlcReader, PlcWriter, PlcS
             LOGGER.debug("Sending subsribe request to MockDevice");
             Map<String, ResponseItem<PlcSubscriptionHandle>> response = subscriptionRequest.getFieldNames().stream()
                 .collect(Collectors.toMap(
-                    Function.identity(),
-                    name -> device.subscribe(((MockField) subscriptionRequest.getField(name)).getAddress())
+                        Function.identity(),
+                        name -> device.subscribe(((MockField) subscriptionRequest.getField(name)).getAddress())
                     )
                 );
-            return new DefaultPlcSubscriptionResponse((DefaultPlcSubscriptionRequest) subscriptionRequest, response);
+            return new DefaultPlcSubscriptionResponse(subscriptionRequest, response);
         });
     }
 
@@ -182,7 +182,7 @@ public class MockConnection implements PlcConnection, PlcReader, PlcWriter, PlcS
             Validate.notNull(device, "No device is set in the mock connection!");
             LOGGER.debug("Sending subsribe request to MockDevice");
             device.unsubscribe();
-            return new DefaultPlcUnsubscriptionResponse((DefaultPlcUnsubscriptionRequest) unsubscriptionRequest);
+            return new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest);
         });
     }
 
diff --git a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index 7f98949cf..542b1132f 100644
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -88,9 +88,7 @@ public class MockDriver implements PlcDriver {
                 }).collect(Collectors.toList());
             PlcSubscriptionResponse response = new PlcSubscriptionResponse(subscriptionRequest, responseItems);*/
             PlcSubscriptionResponse response = new DefaultPlcSubscriptionResponse(mock(PlcSubscriptionRequest.class), new HashMap<>());
-            CompletableFuture<PlcSubscriptionResponse> responseFuture = new CompletableFuture<>();
-            responseFuture.complete(response);
-            return responseFuture;
+            return CompletableFuture.completedFuture(response);
         });
         return plcConnectionMock;
     }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
index b80f106fa..f3d648f5a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
@@ -24,22 +24,22 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionField;
 import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 import org.apache.plc4x.java.spi.connection.PlcFieldHandler;
-import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.SerializationException;
 import org.apache.plc4x.java.spi.generation.WriteBuffer;
 import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
 import org.apache.plc4x.java.spi.utils.Serializable;
-import org.w3c.dom.Element;
 
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "className")
@@ -49,11 +49,15 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
 
     private final LinkedHashMap<String, PlcSubscriptionField> fields;
 
+    private final LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers;
+
     @JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
     public DefaultPlcSubscriptionRequest(@JsonProperty("subscriber") PlcSubscriber subscriber,
-                                         @JsonProperty("fields") LinkedHashMap<String, PlcSubscriptionField> fields) {
+                                         @JsonProperty("fields") LinkedHashMap<String, PlcSubscriptionField> fields,
+                                         @JsonProperty("preRegisteredConsumers") LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers) {
         this.subscriber = subscriber;
         this.fields = fields;
+        this.preRegisteredConsumers = preRegisteredConsumers;
     }
 
     @Override
@@ -86,6 +90,12 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
         return new ArrayList<>(fields.values());
     }
 
+    @Override
+    @JsonIgnore
+    public Map<String, List<Consumer<PlcSubscriptionEvent>>> getPreRegisteredConsumers() {
+        return new LinkedHashMap<>(preRegisteredConsumers);
+    }
+
     @JsonIgnore
     public List<Pair<String, PlcSubscriptionField>> getNamedFields() {
         return fields.entrySet()
@@ -107,7 +117,7 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
             String fieldName = fieldEntry.getKey();
             writeBuffer.pushContext(fieldName);
             PlcField field = fieldEntry.getValue();
-            if(!(field instanceof Serializable)) {
+            if (!(field instanceof Serializable)) {
                 throw new RuntimeException("Error serializing. Field doesn't implement XmlSerializable");
             }
             ((Serializable) field).serialize(writeBuffer);
@@ -123,11 +133,13 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
         private final PlcSubscriber subscriber;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, BuilderItem> fields;
+        private final LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers;
 
         public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) {
             this.subscriber = subscriber;
             this.fieldHandler = fieldHandler;
-            fields = new TreeMap<>();
+            this.fields = new TreeMap<>();
+            this.preRegisteredConsumers = new LinkedHashMap<>();
         }
 
         @Override
@@ -151,6 +163,13 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
             return this;
         }
 
+        @Override
+        public PlcSubscriptionRequest.Builder addPreRegisteredConsumer(String name, Consumer<PlcSubscriptionEvent> consumer) {
+            preRegisteredConsumers.putIfAbsent(name, new LinkedList<>());
+            preRegisteredConsumers.get(name).add(consumer);
+            return this;
+        }
+
         @Override
         public PlcSubscriptionRequest build() {
             LinkedHashMap<String, PlcSubscriptionField> parsedFields = new LinkedHashMap<>();
@@ -159,7 +178,12 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
                 PlcField parsedField = fieldHandler.createField(builderItem.fieldQuery);
                 parsedFields.put(name, new DefaultPlcSubscriptionField(builderItem.plcSubscriptionType, parsedField, builderItem.duration));
             });
-            return new DefaultPlcSubscriptionRequest(subscriber, parsedFields);
+            preRegisteredConsumers.forEach((fieldName, ignored) -> {
+                if (!fields.containsKey(fieldName)) {
+                    throw new RuntimeException("fieldName " + fieldName + "for preRegisteredConsumer not found");
+                }
+            });
+            return new DefaultPlcSubscriptionRequest(subscriber, parsedFields, preRegisteredConsumers);
         }
 
         private static class BuilderItem {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionResponse.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionResponse.java
index 9a3064490..bace3bf00 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionResponse.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionResponse.java
@@ -53,6 +53,13 @@ public class DefaultPlcSubscriptionResponse implements PlcSubscriptionResponse,
                                           @JsonProperty("values") Map<String, ResponseItem<PlcSubscriptionHandle>> values) {
         this.request = request;
         this.values = values;
+        request.getPreRegisteredConsumers().forEach((subscriptionFieldName, consumers) -> {
+            PlcSubscriptionHandle subscriptionHandle = getSubscriptionHandle(subscriptionFieldName);
+            if (subscriptionHandle == null) {
+                throw new PlcRuntimeException("PlcSubscriptionHandle for " + subscriptionFieldName + " not found");
+            }
+            consumers.forEach(subscriptionHandle::register);
+        });
     }
 
     @Override
@@ -109,7 +116,7 @@ public class DefaultPlcSubscriptionResponse implements PlcSubscriptionResponse,
     public void serialize(WriteBuffer writeBuffer) throws SerializationException {
         writeBuffer.pushContext("PlcSubscriptionResponse");
 
-        if(request instanceof Serializable) {
+        if (request instanceof Serializable) {
             ((Serializable) request).serialize(writeBuffer);
         }
         writeBuffer.pushContext("values");


[plc4x] 01/02: doc(plc4): refined java doc regarding subscription

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 119bcf84871a484afa31628fabf1017ab7d8b0b6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 1 13:28:43 2022 +0200

    doc(plc4): refined java doc regarding subscription
---
 .../plc4x/java/api/messages/PlcSubscriptionResponse.java |  9 +++++++++
 .../java/api/messages/PlcUnsubscriptionRequest.java      | 16 +++++++++++++---
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
index 90ca60825..a83ffaeaf 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
@@ -27,8 +27,17 @@ public interface PlcSubscriptionResponse extends PlcSubscriptionFieldResponse {
     @Override
     PlcSubscriptionRequest getRequest();
 
+    /**
+     * Returns a {@link PlcSubscriptionHandle} associated with a {@code name} from {@link PlcSubscriptionRequest#getField(String)}
+     *
+     * @param name the field name which a {@link PlcSubscriptionHandle} is required to
+     * @return a {@link PlcSubscriptionHandle}
+     */
     PlcSubscriptionHandle getSubscriptionHandle(String name);
 
+    /**
+     * @return all {@link PlcSubscriptionHandle}s
+     */
     Collection<PlcSubscriptionHandle> getSubscriptionHandles();
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index 56abed3f2..cb4a77856 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -37,15 +37,25 @@ public interface PlcUnsubscriptionRequest extends PlcRequest {
         PlcUnsubscriptionRequest build();
 
         /**
-         * TODO document me:
+         * {@link PlcSubscriptionHandle} that should be removed from the subscription
          *
-         * @param plcSubscriptionHandle
-         * @return
+         * @param plcSubscriptionHandle {@link PlcSubscriptionHandle} to be removed
          */
         PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle plcSubscriptionHandle);
 
+        /**
+         * {@link PlcSubscriptionHandle}s that should be removed from the subscription
+         *
+         * @param plcSubscriptionHandle1 {@link PlcSubscriptionHandle} to be removed
+         * @param plcSubscriptionHandles {@link PlcSubscriptionHandle} to be removed
+         */
         PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle plcSubscriptionHandle1, PlcSubscriptionHandle... plcSubscriptionHandles);
 
+        /**
+         * {@link PlcSubscriptionHandle}s that should be removed from the subscription
+         *
+         * @param plcSubscriptionHandle {@link PlcSubscriptionHandle} to be removed
+         */
         PlcUnsubscriptionRequest.Builder addHandles(Collection<PlcSubscriptionHandle> plcSubscriptionHandle);
     }