You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/19 07:43:07 UTC
[incubator-plc4x] branch master updated: [ADS] added support for
multiple subscriptions.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push:
new 211f609 [ADS] added support for multiple subscriptions.
211f609 is described below
commit 211f6092f95e64e2d20eb1e542ccdd4c145f832f
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Oct 19 09:40:55 2018 +0200
[ADS] added support for multiple subscriptions.
---
.../java/ads/connection/AdsTcpPlcConnection.java | 160 ++++++++++-----------
.../messages/DefaultPlcSubscriptionRequest.java | 15 +-
.../messages/DefaultPlcSubscriptionResponse.java | 21 ++-
.../messages/InternalPlcSubscriptionRequest.java | 3 +
4 files changed, 105 insertions(+), 94 deletions(-)
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 3a78dd8..3b20fd1 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -33,7 +33,6 @@ import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
@@ -142,92 +141,87 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
@Override
public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest plcSubscriptionRequest) {
InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = checkInternal(plcSubscriptionRequest, InternalPlcSubscriptionRequest.class);
- // TODO: Make this multi-value
CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
- if (internalPlcSubscriptionRequest.getNumberOfFields() != 1) {
- throw new PlcNotImplementedException("Multirequest on subscribe not implemented yet");
- }
- SubscriptionPlcField subscriptionPlcField = internalPlcSubscriptionRequest.getSubscriptionFields().get(0);
- PlcField field = subscriptionPlcField.getPlcField();
-
- IndexGroup indexGroup;
- IndexOffset indexOffset;
- AdsDataType adsDataType;
- int numberOfElements;
- // If this is a symbolic field, it has to be resolved first.
- // TODO: This is blocking, should be changed to be async.
- if (field instanceof SymbolicAdsField) {
- mapFields((SymbolicAdsField) field);
- DirectAdsField directAdsField = fieldMapping.get(field);
- if (directAdsField == null) {
- throw new PlcRuntimeException("Unresolvable field " + field);
- }
- indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
- indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
- adsDataType = directAdsField.getAdsDataType();
- numberOfElements = directAdsField.getNumberOfElements();
- }
- // If it's no symbolic field, we can continue immediately
- // without having to do any resolving.
- else if (field instanceof DirectAdsField) {
- DirectAdsField directAdsField = (DirectAdsField) field;
- indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
- indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
- adsDataType = directAdsField.getAdsDataType();
- numberOfElements = directAdsField.getNumberOfElements();
- } else {
- throw new IllegalArgumentException("Unsupported field type " + field.getClass());
- }
+ Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getSubscriptionPlcFieldMap().entrySet().stream()
+ .map(subscriptionPlcFieldEntry -> {
+ String plcFieldName = subscriptionPlcFieldEntry.getKey();
+ SubscriptionPlcField subscriptionPlcField = subscriptionPlcFieldEntry.getValue();
+ PlcField field = subscriptionPlcField.getPlcField();
+
+ IndexGroup indexGroup;
+ IndexOffset indexOffset;
+ AdsDataType adsDataType;
+ int numberOfElements;
+ // If this is a symbolic field, it has to be resolved first.
+ // TODO: This is blocking, should be changed to be async.
+ if (field instanceof SymbolicAdsField) {
+ mapFields((SymbolicAdsField) field);
+ DirectAdsField directAdsField = fieldMapping.get(field);
+ if (directAdsField == null) {
+ throw new PlcRuntimeException("Unresolvable field " + field);
+ }
+ indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
+ indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
+ adsDataType = directAdsField.getAdsDataType();
+ numberOfElements = directAdsField.getNumberOfElements();
+ }
+ // If it's no symbolic field, we can continue immediately
+ // without having to do any resolving.
+ else if (field instanceof DirectAdsField) {
+ DirectAdsField directAdsField = (DirectAdsField) field;
+ indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
+ indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
+ adsDataType = directAdsField.getAdsDataType();
+ numberOfElements = directAdsField.getNumberOfElements();
+ } else {
+ throw new IllegalArgumentException("Unsupported field type " + field.getClass());
+ }
- final TransmissionMode transmissionMode;
- long cycleTime = 4000000;
- switch (subscriptionPlcField.getPlcSubscriptionType()) {
- case CYCLIC:
- transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
- cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
- break;
- case CHANGE_OF_STATE:
- transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
- break;
- default:
- throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType());
- }
+ final TransmissionMode transmissionMode;
+ long cycleTime = 4000000;
+ switch (subscriptionPlcField.getPlcSubscriptionType()) {
+ case CYCLIC:
+ transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
+ cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
+ break;
+ case CHANGE_OF_STATE:
+ transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
+ break;
+ default:
+ throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType());
+ }
- // Prepare the subscription request itself.
- AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
- targetAmsNetId,
- targetAmsPort,
- sourceAmsNetId,
- sourceAmsPort,
- Invoke.NONE,
- indexGroup,
- indexOffset,
- Length.of(adsDataType.getTargetByteSize() * (long) numberOfElements),
- transmissionMode,
- MaxDelay.of(0),
- CycleTime.of(cycleTime)
- );
-
- // Send the request to the plc and wait for a response
- // TODO: This is blocking, should be changed to be async.
- CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
- channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
- InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
- AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
-
- // Abort if we got anything but a successful response.
- if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
- throw new PlcRuntimeException("Error code received " + response.getResult());
- }
- AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle());
-
- Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getFieldNames()
- .stream()
- .collect(Collectors.toMap(
- fieldName -> fieldName,
- ignored -> Pair.of(PlcResponseCode.OK, adsSubscriptionHandle)
- ));
+ // Prepare the subscription request itself.
+ AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
+ targetAmsNetId,
+ targetAmsPort,
+ sourceAmsNetId,
+ sourceAmsPort,
+ Invoke.NONE,
+ indexGroup,
+ indexOffset,
+ Length.of(adsDataType.getTargetByteSize() * (long) numberOfElements),
+ transmissionMode,
+ MaxDelay.of(0),
+ CycleTime.of(cycleTime)
+ );
+
+ // Send the request to the plc and wait for a response
+ // TODO: This is blocking, should be changed to be async.
+ CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
+ channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
+ InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+ AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
+
+ // Abort if we got anything but a successful response.
+ if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
+ throw new PlcRuntimeException("Error code received " + response.getResult());
+ }
+ PlcSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle());
+ return Pair.of(plcFieldName, Pair.of(PlcResponseCode.OK, adsSubscriptionHandle));
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
future.complete(new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, responseItems));
return future;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index fd0f595..599e4ad 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -77,6 +77,11 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
}
@Override
+ public LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap() {
+ return fields;
+ }
+
+ @Override
public LinkedList<Pair<String, PlcField>> getNamedFields() {
return fields.entrySet()
.stream()
@@ -88,7 +93,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
private final PlcSubscriber subscriber;
private final PlcFieldHandler fieldHandler;
- private final Map<String, BuilderItem<Object>> fields;
+ private final Map<String, BuilderItem> fields;
public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) {
this.subscriber = subscriber;
@@ -98,19 +103,19 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
@Override
public PlcSubscriptionRequest.Builder addCyclicField(String name, String fieldQuery, Duration pollingInterval) {
- fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval));
+ fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval));
return this;
}
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateField(String name, String fieldQuery) {
- fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE));
+ fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE));
return this;
}
@Override
public PlcSubscriptionRequest.Builder addEventField(String name, String fieldQuery) {
- fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.EVENT));
+ fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.EVENT));
return this;
}
@@ -125,7 +130,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
return new DefaultPlcSubscriptionRequest(subscriber, parsedFields);
}
- private static class BuilderItem<T> {
+ private static class BuilderItem {
private final String fieldQuery;
private final PlcSubscriptionType plcSubscriptionType;
private final Duration duration;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
index a8924e9..47a5ce2 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
@@ -42,8 +43,14 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe
@Override
public PlcSubscriptionHandle getSubscriptionHandle(String name) {
- // TODO: add safety
- return values.get(name).getValue();
+ Pair<PlcResponseCode, PlcSubscriptionHandle> response = values.get(name);
+ if (response == null) {
+ return null;
+ }
+ if (response.getKey() != PlcResponseCode.OK) {
+ throw new PlcRuntimeException("Item " + name + " failed to subscribe: " + response.getKey());
+ }
+ return response.getValue();
}
@Override
@@ -53,14 +60,16 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe
@Override
public PlcField getField(String name) {
- // TODO: or should subscription handle be a successor of PlcField?
- throw new NotImplementedException("field access not implemented");
+ throw new NotImplementedException("field access not possible as these come async");
}
@Override
public PlcResponseCode getResponseCode(String name) {
- // TODO: add safety
- return values.get(name).getKey();
+ Pair<PlcResponseCode, PlcSubscriptionHandle> response = values.get(name);
+ if (response == null) {
+ return null;
+ }
+ return response.getKey();
}
@Override
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 608691a..6dcb30d 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -21,9 +21,12 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.base.model.SubscriptionPlcField;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
LinkedList<SubscriptionPlcField> getSubscriptionFields();
+
+ LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap();
}