You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/18 13:20:23 UTC
[incubator-plc4x] 01/02: [GENERAL] updated subscription api after
big refactorings
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 514719c971214bd660440045cc900f0690f1a9e6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Oct 18 15:08:20 2018 +0200
[GENERAL] updated subscription api after big refactorings
---
.../api/messages/PlcUnsubscriptionRequest.java | 4 +-
.../api/messages/PlcUnsubscriptionResponse.java | 2 +-
.../java/api/model/PlcConsumerRegistration.java | 1 +
.../java/api/model/PlcSubscriptionHandle.java | 5 ++
.../java/ads/connection/AdsTcpPlcConnection.java | 30 +++++-----
.../java/ads/model/AdsSubscriptionHandle.java | 8 ++-
.../plc4x/java/ads/protocol/Plc4x2AdsProtocol.java | 5 +-
.../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 40 +++++++++----
.../messages/DefaultPlcSubscriptionRequest.java | 65 +++++++++++++---------
.../messages/DefaultPlcUnsubscriptionRequest.java | 35 ++----------
.../messages/DefaultPlcUnsubscriptionResponse.java | 19 -------
.../messages/InternalPlcSubscriptionRequest.java | 7 ++-
.../messages/InternalPlcUnsubscriptionRequest.java | 2 +-
.../plc4x/java/base/messages/PlcSubscriber.java | 2 -
.../base/model/DefaultPlcConsumerRegistration.java | 14 ++++-
...tion.java => DefaultPlcSubscriptionHandle.java} | 39 ++++++-------
.../java/base/model/SubscriptionPlcField.java | 55 ++++++++++++++++++
.../SingleItemToSingleRequestProtocolTest.java | 10 ++--
18 files changed, 197 insertions(+), 146 deletions(-)
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index 8b95f44..74caccd 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-public interface PlcUnsubscriptionRequest extends PlcFieldRequest {
+public interface PlcUnsubscriptionRequest extends PlcRequest {
@Override
- CompletableFuture<? extends PlcUnsubscriptionResponse> execute();
+ CompletableFuture<PlcUnsubscriptionResponse> execute();
interface Builder extends PlcRequestBuilder {
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index 4205e8b..705ee8d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.plc4x.java.api.messages;
-public interface PlcUnsubscriptionResponse extends PlcFieldResponse {
+public interface PlcUnsubscriptionResponse extends PlcResponse {
@Override
PlcUnsubscriptionRequest getRequest();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
index 0f3e65b..a0a8c52 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
@@ -20,4 +20,5 @@
package org.apache.plc4x.java.api.model;
public interface PlcConsumerRegistration {
+ void unregister();
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
index d295e9e..a9338ed 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
@@ -18,6 +18,10 @@ under the License.
*/
package org.apache.plc4x.java.api.model;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+
+import java.util.function.Consumer;
+
/**
* When subscribing to remote resources, depending on the used protocol
* different data is used to identify a subscription. This interface is
@@ -29,4 +33,5 @@ package org.apache.plc4x.java.api.model;
*/
public interface PlcSubscriptionHandle {
+ PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer);
}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 406557f..d4b2364 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -32,7 +32,6 @@ import org.apache.plc4x.java.ads.model.*;
import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.base.messages.PlcSubscriber;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
@@ -43,8 +42,10 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.base.model.InternalPlcConsumerRegistration;
import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,10 +53,8 @@ import org.slf4j.LoggerFactory;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -146,7 +145,8 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
throw new PlcNotImplementedException("Multirequest on subscribe not implemented yet");
}
- PlcField field = internalPlcSubscriptionRequest.getFields().get(0);
+ SubscriptionPlcField subscriptionPlcField = internalPlcSubscriptionRequest.getSubscriptionFields().get(0);
+ PlcField field = subscriptionPlcField.getPlcField();
IndexGroup indexGroup;
IndexOffset indexOffset;
@@ -178,15 +178,17 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
}
final TransmissionMode transmissionMode;
- switch (internalPlcSubscriptionRequest.getPlcSubscriptionType()) {
+ long cycleTime = 4000000;
+ switch (subscriptionPlcField.getPlcSubscriptionType()) {
case CYCLIC:
transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
+ cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
break;
case CHANGE_OF_STATE:
transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
break;
default:
- throw new PlcRuntimeException("Unmapped type " + internalPlcSubscriptionRequest.getPlcSubscriptionType());
+ throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType());
}
// Prepare the subscription request itself.
@@ -201,7 +203,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
Length.of(adsDataType.getTargetByteSize() * numberOfElements),
transmissionMode,
MaxDelay.of(0),
- CycleTime.of(4000000)
+ CycleTime.of(cycleTime)
);
// Send the request to the plc and wait for a response
@@ -215,7 +217,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
throw new PlcRuntimeException("Error code received " + response.getResult());
}
- AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(response.getNotificationHandle());
+ AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle());
Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getFieldNames()
.stream()
@@ -262,11 +264,9 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
- return register(consumer, handles);
+ return register(consumer, handles.toArray(new PlcSubscriptionHandle[0]));
}
- // TODO: figure out what this is
- /*@Override
public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, PlcSubscriptionHandle... handles) {
Objects.requireNonNull(consumer);
Objects.requireNonNull(handles);
@@ -275,7 +275,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
internalPlcSubscriptionHandles[i] = checkInternal(handles[i], InternalPlcSubscriptionHandle.class);
}
- InternalPlcConsumerRegistration internalPlcConsumerRegistration = new DefaultPlcConsumerRegistration(consumer, internalPlcSubscriptionHandles);
+ InternalPlcConsumerRegistration internalPlcConsumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, internalPlcSubscriptionHandles);
Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
@@ -300,7 +300,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
return internalPlcConsumerRegistration;
- }*/
+ }
@Override
public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index 31b90ea..d7927cb 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -19,15 +19,17 @@ under the License.
package org.apache.plc4x.java.ads.model;
import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
-import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
+import org.apache.plc4x.java.base.model.DefaultPlcSubscriptionHandle;
import java.util.Objects;
-public class AdsSubscriptionHandle implements InternalPlcSubscriptionHandle {
+public class AdsSubscriptionHandle extends DefaultPlcSubscriptionHandle {
private NotificationHandle notificationHandle;
- public AdsSubscriptionHandle(NotificationHandle notificationHandle) {
+ public AdsSubscriptionHandle(PlcSubscriber plcSubscriber, NotificationHandle notificationHandle) {
+ super(plcSubscriber);
this.notificationHandle = notificationHandle;
}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
index fbb359e..1ba5cd9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
@@ -28,6 +28,7 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
import org.apache.plc4x.java.ads.api.generic.types.Invoke;
import org.apache.plc4x.java.ads.model.AdsDataType;
+import org.apache.plc4x.java.ads.model.AdsField;
import org.apache.plc4x.java.ads.model.DirectAdsField;
import org.apache.plc4x.java.ads.model.SymbolicAdsField;
import org.apache.plc4x.java.ads.protocol.exception.AdsException;
@@ -35,7 +36,6 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcProtocolPayloadTooBigException;
-import org.apache.plc4x.java.base.messages.PlcProprietaryRequest;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcRequest;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
@@ -292,8 +292,7 @@ public class Plc4x2AdsProtocol extends MessageToMessageCodec<AmsPacket, PlcReque
InternalPlcReadRequest plcReadRequest = (InternalPlcReadRequest) requestContainer.getRequest();
// TODO: only single requests supported for now
- DirectAdsField field = (DirectAdsField) plcReadRequest.getFields().get(0);
-
+ AdsField field = (AdsField) plcReadRequest.getFields().get(0);
PlcResponseCode responseCode = decodeResponseCode(responseMessage.getResult());
byte[] bytes = responseMessage.getData().getBytes();
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 491f606..f65d189 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -21,9 +21,13 @@ package org.apache.plc4x.java.ads;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class ManualPlc4XAdsTest {
@@ -34,34 +38,48 @@ public class ManualPlc4XAdsTest {
connectionUrl = "ads:serial:///dev/ttys003/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
} else {
System.out.println("Using tcp");
- connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.1.1.1:30000";
+ connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
}
+ // TODO: temporary workaround
+ Thread.currentThread().setUncaughtExceptionHandler((t, e) -> {
+ System.err.println(t + " - " + e.getMessage());
+ e.printStackTrace(System.err);
+ System.exit(1);
+ });
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
System.out.println("PlcConnection " + plcConnection);
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build();
+ PlcReadRequest.Builder readRequestBuilder = plcConnection.readRequestBuilder().orElseThrow(RuntimeException::new);
+ PlcReadRequest readRequest = readRequestBuilder.addItem("station", "Allgemein_S2.Station:BYTE").build();
CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
PlcReadResponse readResponse = response.get();
System.out.println("Response " + readResponse);
Collection<Integer> stations = readResponse.getAllIntegers("station");
stations.forEach(System.out::println);
- PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
- CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute();
- PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get();
+ // 2. We build a subscription
+ PlcSubscriptionRequest.Builder subscriptionRequestBuilder = plcConnection.subscriptionRequestBuilder().orElseThrow(RuntimeException::new);
+ PlcSubscriptionRequest subscriptionRequest = subscriptionRequestBuilder.addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
+ PlcSubscriptionResponse plcSubscriptionResponse = subscriptionRequest.execute().get();
- // TODO: figure out what to do with this
- /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
+ List<PlcConsumerRegistration> plcConsumerRegistrations = plcSubscriptionResponse.getSubscriptionHandles().stream()
+ .map(plcSubscriptionHandle -> plcSubscriptionHandle.register(System.out::println))
+ .collect(Collectors.toList());
+ // Now we wait a bit
TimeUnit.SECONDS.sleep(5);
- plcSubscriber.unregister(plcConsumerRegistration);
- PlcUnsubscriptionRequest unsubscriptionRequest = plcConnection.unsubscriptionRequestBuilder().get().addHandles(plcSubscriptionResponse.getSubscriptionHandles()).build();
- CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest);
+ // we unregister the listener
+ plcConsumerRegistrations.forEach(PlcConsumerRegistration::unregister);
+
+ // we unsubscribe at the plc
+ PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder = plcConnection.unsubscriptionRequestBuilder().orElseThrow(RuntimeException::new);
+ PlcUnsubscriptionRequest unsubscriptionRequest = unsubscriptionRequestBuilder.addHandles(plcSubscriptionResponse.getSubscriptionHandles()).build();
+ CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = unsubscriptionRequest.execute();
unsubscriptionResponse
.get(5, TimeUnit.SECONDS);
- System.out.println(unsubscriptionResponse);*/
+ System.out.println(unsubscriptionResponse);
}
System.exit(0);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 815b46e..fd0f595 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -18,27 +18,28 @@ under the License.
*/
package org.apache.plc4x.java.base.messages;
-import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.base.connection.PlcFieldHandler;
-import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
+import java.util.stream.Collectors;
-// TODO: request broken needs finishing.
public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
private final PlcSubscriber subscriber;
- public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+ private LinkedHashMap<String, SubscriptionPlcField> fields;
+
+ public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields) {
this.subscriber = subscriber;
+ this.fields = fields;
}
@Override
@@ -48,32 +49,39 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
@Override
public int getNumberOfFields() {
- throw new IllegalStateException("not available");
+ return fields.size();
}
@Override
public LinkedHashSet<String> getFieldNames() {
- throw new IllegalStateException("not available");
+ return new LinkedHashSet<>(fields.keySet());
}
@Override
public PlcField getField(String name) {
- throw new IllegalStateException("not available");
+ SubscriptionPlcField subscriptionPlcField = fields.get(name);
+ if (subscriptionPlcField == null) {
+ return null;
+ }
+ return subscriptionPlcField.getPlcField();
}
@Override
public LinkedList<PlcField> getFields() {
- throw new IllegalStateException("not available");
+ return fields.values().stream().map(SubscriptionPlcField::getPlcField).collect(Collectors.toCollection(LinkedList::new));
}
@Override
- public PlcSubscriptionType getPlcSubscriptionType() {
- throw new IllegalStateException("not available");
+ public LinkedList<SubscriptionPlcField> getSubscriptionFields() {
+ return new LinkedList<>(fields.values());
}
@Override
public LinkedList<Pair<String, PlcField>> getNamedFields() {
- throw new IllegalStateException("not available");
+ return fields.entrySet()
+ .stream()
+ .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), (PlcField) stringPlcFieldEntry.getValue()))
+ .collect(Collectors.toCollection(LinkedList::new));
}
public static class Builder implements PlcSubscriptionRequest.Builder {
@@ -90,41 +98,48 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
@Override
public PlcSubscriptionRequest.Builder addCyclicField(String name, String fieldQuery, Duration pollingInterval) {
- return null;
+ fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval));
+ return this;
}
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateField(String name, String fieldQuery) {
- return null;
+ fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE));
+ return this;
}
@Override
public PlcSubscriptionRequest.Builder addEventField(String name, String fieldQuery) {
- return null;
+ fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.EVENT));
+ return this;
}
@Override
public PlcSubscriptionRequest build() {
- LinkedHashMap<String, Pair<PlcField, FieldItem>> parsedFields = new LinkedHashMap<>();
+ LinkedHashMap<String, SubscriptionPlcField> parsedFields = new LinkedHashMap<>();
+
fields.forEach((name, builderItem) -> {
- // Compile the query string.
PlcField parsedField = fieldHandler.createField(builderItem.fieldQuery);
- // Encode the payload.
- // TODO: Depending on the field type, handle the FieldItem creation differently.
- FieldItem fieldItem = builderItem.encoder.apply(parsedField, null);
- parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
+ parsedFields.put(name, new SubscriptionPlcField(builderItem.plcSubscriptionType, parsedField, builderItem.duration));
});
- return new DefaultPlcSubscriptionRequest(subscriber);
+ return new DefaultPlcSubscriptionRequest(subscriber, parsedFields);
}
private static class BuilderItem<T> {
private final String fieldQuery;
- private final BiFunction<PlcField, T[], FieldItem> encoder;
+ private final PlcSubscriptionType plcSubscriptionType;
+ private final Duration duration;
- private BuilderItem(String fieldQuery, BiFunction<PlcField, T[], FieldItem> encoder) {
+ private BuilderItem(String fieldQuery, PlcSubscriptionType plcSubscriptionType) {
+ this(fieldQuery, plcSubscriptionType, null);
+ }
+
+ private BuilderItem(String fieldQuery, PlcSubscriptionType plcSubscriptionType, Duration duration) {
this.fieldQuery = fieldQuery;
- this.encoder = encoder;
+ this.plcSubscriptionType = plcSubscriptionType;
+ this.duration = duration;
}
+
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 3e13d57..4f162d4 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -18,19 +18,19 @@
*/
package org.apache.plc4x.java.base.messages;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
-import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-// TODO: request broken needs finishing.
-public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
+public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcRequest {
private final PlcSubscriber subscriber;
@@ -47,35 +47,10 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
}
@Override
- public int getNumberOfFields() {
- throw new IllegalStateException("not available");
- }
-
- @Override
- public LinkedHashSet<String> getFieldNames() {
- throw new IllegalStateException("not available");
- }
-
- @Override
- public PlcField getField(String name) {
- throw new IllegalStateException("not available");
- }
-
- @Override
- public LinkedList<PlcField> getFields() {
- throw new IllegalStateException("not available");
- }
-
- @Override
public Collection<? extends InternalPlcSubscriptionHandle> getInternalPlcSubscriptionHandles() {
return internalPlcSubscriptionHandles;
}
- @Override
- public LinkedList<Pair<String, PlcField>> getNamedFields() {
- throw new IllegalStateException("not available");
- }
-
public static class Builder implements PlcUnsubscriptionRequest.Builder {
private final PlcSubscriber subscriber;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
index 2207e75..e3f855a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
@@ -19,29 +19,10 @@
package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
-import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
-
-import java.util.Collection;
public class DefaultPlcUnsubscriptionResponse implements InternalPlcUnsubscriptionResponse {
@Override
- public Collection<String> getFieldNames() {
- return null;
- }
-
- @Override
- public PlcField getField(String name) {
- return null;
- }
-
- @Override
- public PlcResponseCode getResponseCode(String name) {
- return null;
- }
-
- @Override
public PlcUnsubscriptionRequest getRequest() {
return null;
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 3d4ca98..608691a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -19,10 +19,11 @@
package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
-public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
+import java.util.LinkedList;
- PlcSubscriptionType getPlcSubscriptionType();
+public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
+ LinkedList<SubscriptionPlcField> getSubscriptionFields();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
index 7a8c3db..fd5b3ca 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
@@ -23,7 +23,7 @@ import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
import java.util.Collection;
-public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest, InternalPlcFieldRequest {
+public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest, InternalPlcRequest {
Collection<? extends InternalPlcSubscriptionHandle> getInternalPlcSubscriptionHandles();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
index 577a774..8d7eedb 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
@@ -22,10 +22,8 @@ import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
-import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
/**
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
index 28583b3..55b1cf9 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
@@ -19,6 +19,7 @@
package org.apache.plc4x.java.base.model;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
import java.util.Arrays;
import java.util.Collection;
@@ -27,12 +28,16 @@ import java.util.function.Consumer;
public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegistration {
+ private final PlcSubscriber plcSubscriber;
+
private final Collection<? extends InternalPlcSubscriptionHandle> handles;
+
private final int consumerHash;
- public DefaultPlcConsumerRegistration(Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
- consumerHash = Objects.requireNonNull(consumer).hashCode();
+ public DefaultPlcConsumerRegistration(PlcSubscriber plcSubscriber, Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
+ this.plcSubscriber = plcSubscriber;
this.handles = Arrays.asList(Objects.requireNonNull(handles));
+ this.consumerHash = Objects.requireNonNull(consumer).hashCode();
}
@Override
@@ -70,4 +75,9 @@ public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegist
", consumerHash=" + consumerHash +
'}';
}
+
+ @Override
+ public void unregister() {
+ plcSubscriber.unregister(this);
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
similarity index 52%
copy from plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
copy to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
index 28583b3..3eb685b 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
@@ -16,33 +16,28 @@
specific language governing permissions and limitations
under the License.
*/
+
package org.apache.plc4x.java.base.model;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
-public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegistration {
-
- private final Collection<? extends InternalPlcSubscriptionHandle> handles;
- private final int consumerHash;
+public class DefaultPlcSubscriptionHandle implements InternalPlcSubscriptionHandle {
- public DefaultPlcConsumerRegistration(Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
- consumerHash = Objects.requireNonNull(consumer).hashCode();
- this.handles = Arrays.asList(Objects.requireNonNull(handles));
- }
+ private final PlcSubscriber plcSubscriber;
- @Override
- public int getConsumerHash() {
- return consumerHash;
+ public DefaultPlcSubscriptionHandle(PlcSubscriber plcSubscriber) {
+ this.plcSubscriber = plcSubscriber;
}
@Override
- public Collection<? extends InternalPlcSubscriptionHandle> getAssociatedHandles() {
- return handles;
+ public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
+ return plcSubscriber.register(consumer, Collections.singletonList(this));
}
@Override
@@ -50,24 +45,22 @@ public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegist
if (this == o) {
return true;
}
- if (!(o instanceof DefaultPlcConsumerRegistration)) {
+ if (!(o instanceof DefaultPlcSubscriptionHandle)) {
return false;
}
- DefaultPlcConsumerRegistration that = (DefaultPlcConsumerRegistration) o;
- return consumerHash == that.consumerHash &&
- Objects.equals(handles, that.handles);
+ DefaultPlcSubscriptionHandle that = (DefaultPlcSubscriptionHandle) o;
+ return Objects.equals(plcSubscriber, that.plcSubscriber);
}
@Override
public int hashCode() {
- return Objects.hash(handles, consumerHash);
+ return Objects.hash(plcSubscriber);
}
@Override
public String toString() {
- return "DefaultPlcConsumerRegistration{" +
- "handles=" + handles +
- ", consumerHash=" + consumerHash +
+ return "DefaultPlcSubscriptionHandle{" +
+ "plcSubscriber=" + plcSubscriber +
'}';
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java
new file mode 100644
index 0000000..e68e4c8
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java
@@ -0,0 +1,55 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.model;
+
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * special {@link PlcField} which adds a {@link PlcSubscriptionType}.
+ */
+public class SubscriptionPlcField {
+
+ private final PlcSubscriptionType plcSubscriptionType;
+
+ private final PlcField plcField;
+
+ private final Duration duration;
+
+ public SubscriptionPlcField(PlcSubscriptionType plcSubscriptionType, PlcField plcField, Duration duration) {
+ this.plcSubscriptionType = plcSubscriptionType;
+ this.plcField = plcField;
+ this.duration = duration;
+ }
+
+ public PlcSubscriptionType getPlcSubscriptionType() {
+ return plcSubscriptionType;
+ }
+
+ public PlcField getPlcField() {
+ return plcField;
+ }
+
+ public Optional<Duration> getDuration() {
+ return Optional.ofNullable(duration);
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 5a9ac98..b3c6458 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -25,15 +25,13 @@ import io.netty.channel.PendingWriteQueue;
import io.netty.util.HashedWheelTimer;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.plc4x.java.base.messages.PlcReader;
-import org.apache.plc4x.java.base.messages.PlcSubscriber;
-import org.apache.plc4x.java.base.messages.PlcWriter;
import org.apache.plc4x.java.api.messages.PlcFieldRequest;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.*;
import org.apache.plc4x.java.base.messages.items.FieldItem;
import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
import org.assertj.core.api.WithAssertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -434,13 +432,13 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
- private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
- super(subscriber);
+ private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields) {
+ super(subscriber, fields);
}
private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
// TODO: implement me once available
- return new TestDefaultPlcSubscriptionRequest(subscriber);
+ return new TestDefaultPlcSubscriptionRequest(subscriber, new LinkedHashMap<>());
}
}