You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/07/11 14:57:50 UTC
[incubator-plc4x] 01/01: PLC4X-40 - Refactor the PlcSubscriber to
be more aligned with the PlcReader and PlcWriter
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 748a39d8f3cdbabb9f4e086f1aaa4877ed042dd4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Jul 11 16:57:39 2018 +0200
PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter
---
.../java/org/apache/plc4x/camel/Plc4XConsumer.java | 31 ++-
.../java/org/apache/plc4x/camel/ManualTest.java | 2 +-
.../java/org/apache/plc4x/camel/MockDriver.java | 57 ++++--
.../plc4x/java/api/connection/PlcSubscriber.java | 24 +--
.../plc4x/java/api/messages/PlcNotification.java | 80 --------
.../apache/plc4x/java/api/messages/PlcRequest.java | 2 +-
.../java/api/messages/PlcRequestContainer.java | 8 +
...ResponseItem.java => PlcSubscriptionEvent.java} | 18 +-
...sponseItem.java => PlcSubscriptionRequest.java} | 10 +-
...ponseItem.java => PlcSubscriptionResponse.java} | 13 +-
.../api/messages/PlcUnsubscriptionRequest.java | 54 ++++++
...nseItem.java => PlcUnsubscriptionResponse.java} | 11 +-
.../java/api/messages/items/ReadRequestItem.java | 32 ++--
.../java/api/messages/items/ReadResponseItem.java | 28 +--
.../plc4x/java/api/messages/items/RequestItem.java | 28 +--
.../java/api/messages/items/ResponseItem.java | 28 +--
.../api/messages/items/SubscriptionEventItem.java | 48 +++++
...a => SubscriptionRequestChangeOfStateItem.java} | 11 +-
...tem.java => SubscriptionRequestCyclicItem.java} | 25 ++-
...Item.java => SubscriptionRequestEventItem.java} | 11 +-
.../messages/items/SubscriptionRequestItem.java | 45 +++++
.../messages/items/SubscriptionResponseItem.java | 37 ++++
.../messages/items/UnsubscriptionRequestItem.java | 62 ++++++
.../messages/items/UnsubscriptionResponseItem.java | 29 +++
.../java/api/messages/items/WriteRequestItem.java | 28 +--
.../java/api/messages/items/WriteResponseItem.java | 28 +--
.../SubscriptionHandle.java} | 19 +-
.../SubscriptionType.java} | 25 ++-
.../ads/connection/AdsAbstractPlcConnection.java | 5 +
.../java/ads/connection/AdsTcpPlcConnection.java | 208 ++++++++++++---------
.../java/ads/model/AdsSubscriptionHandle.java} | 17 +-
.../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 24 ++-
.../ads/connection/AdsTcpPlcConnectionTests.java | 46 +++--
pom.xml | 3 +
34 files changed, 722 insertions(+), 375 deletions(-)
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 08891e7..16fc626 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -26,14 +26,21 @@ import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.items.*;
import org.apache.plc4x.java.api.model.Address;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcNotification<Object>> {
+public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<SubscriptionEventItem> {
private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
private Plc4XEndpoint endpoint;
@@ -42,6 +49,7 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
private PlcConnection plcConnection;
private Address address;
private Class<?> dataType;
+ private PlcSubscriptionResponse subscriptionResponse;
public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
@@ -73,13 +81,20 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
}
@Override
- protected void doStart() {
- getSubscriber().subscribe(this, address, dataType);
+ protected void doStart() throws InterruptedException, ExecutionException, TimeoutException {
+ PlcSubscriptionRequest request = new PlcSubscriptionRequest();
+ request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3));
+ CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request);
+ subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
}
@Override
protected void doStop() {
- getSubscriber().unsubscribe(this, address);
+ PlcUnsubscriptionRequest request = new PlcUnsubscriptionRequest();
+ subscriptionResponse.getResponseItems().stream()
+ .map(SubscriptionResponseItem::getSubscriptionHandle)
+ .map(UnsubscriptionRequestItem::new)
+ .forEach(request::addItem);
try {
plcConnection.close();
} catch (Exception e) {
@@ -92,11 +107,11 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
}
@Override
- public void accept(PlcNotification<Object> plcNotification) {
- LOGGER.debug("Received {}", plcNotification);
+ public void accept(SubscriptionEventItem subscriptionEventItem) {
+ LOGGER.debug("Received {}", subscriptionEventItem);
try {
Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(unwrapIfSingle(plcNotification.getValues()));
+ exchange.getIn().setBody(unwrapIfSingle(subscriptionEventItem.getValues()));
processor.process(exchange);
} catch (Exception e) {
exceptionHandler.handleException(e);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
index cd96f9f..9eff617 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
@@ -53,7 +53,7 @@ public class ManualTest {
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
- from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
+ from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.3.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
.process(exchange -> System.out.println("Invoked timer at " + new Date()))
.bean("foo")
.log("Received ${body}");
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index dca996b..f7c00cb 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -25,18 +25,27 @@ import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Calendar;
import java.util.Collections;
-import java.util.Date;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.mockito.Mockito.*;
@@ -58,6 +67,7 @@ public class MockDriver implements PlcDriver {
@Override
public PlcConnection connect(String url) {
+ // Mock a connection.
PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
try {
when(plcConnectionMock.parseAddress(anyString())).thenReturn(mock(Address.class));
@@ -65,29 +75,40 @@ public class MockDriver implements PlcDriver {
throw new RuntimeException(e);
}
when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
+
+ // Mock a typical subscriber.
PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
- doAnswer(invocation -> {
+ when(plcSubscriber.subscribe(any())).thenAnswer(invocation -> {
LOGGER.info("Received {}", invocation);
- Consumer consumer = invocation.getArgument(0);
- executorService.submit(() -> {
- while (!Thread.currentThread().isInterrupted()) {
- consumer.accept(new PlcNotification(new Date(), mock(Address.class), Collections.singletonList("HelloWorld")));
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
- });
- return null;
- }).when(plcSubscriber).subscribe(any(), any(), any());
+ PlcSubscriptionRequest subscriptionRequest = invocation.getArgument(0);
+ List<SubscriptionResponseItem<?>> responseItems =
+ subscriptionRequest.getRequestItems().stream().map(subscriptionRequestItem -> {
+ Consumer consumer = subscriptionRequestItem.getConsumer();
+ executorService.submit(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ consumer.accept(new SubscriptionEventItem<>(null, Calendar.getInstance(), Collections.singletonList("HelloWorld")));
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return new SubscriptionResponseItem<>(subscriptionRequestItem,
+ mock(SubscriptionHandle.class, RETURNS_DEEP_STUBS), ResponseCode.OK);
+ }).collect(Collectors.toList());
+ PlcSubscriptionResponse response = new PlcSubscriptionResponse(subscriptionRequest, responseItems);
+ CompletableFuture<PlcSubscriptionResponse> responseFuture = new CompletableFuture<>();
+ responseFuture.complete(response);
+ return responseFuture;
+ });
when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
return plcConnectionMock;
}
@Override
- public PlcConnection connect(String url, PlcAuthentication authentication) throws PlcConnectionException {
+ public PlcConnection connect(String url, PlcAuthentication authentication) {
return connect(null);
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index 4296df3..cd52d48 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -18,9 +18,10 @@
*/
package org.apache.plc4x.java.api.connection;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.model.Address;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
@@ -29,21 +30,20 @@ import java.util.function.Consumer;
public interface PlcSubscriber {
/**
- * Subscribes a {@code consumer} to a {@code address} parsing values as {@code dataType}.
- * {@code consumer} and {@code address} are used as unique identification.
+ * Subscribes to addresses on the PLC.
*
- * @param consumer to be subscribed.
- * @param address to be read.
- * @param dataType to be decoded.
+ * @param subscriptionRequest subscription request containing at least one subscription request item.
+ * @return subscription response containing a subscription response item for each subscription request item.
*/
- <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType);
-
+ CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest);
/**
- * Unsubscribes a {@code consumer}.
- * {@code consumer} and {@code address} are used as unique identification.
+ * Unsubscribes from addresses on the PLC. For unsubscribing the unsubscription request uses the subscription
+ * handle returned as part of the subscription response item.
*
- * @param consumer to be unsubscribed.
+ * @param unsubscriptionRequest unsubscription request containing at least one unsubscription request item.
+ * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
*/
- <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address);
+ CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest);
+
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
deleted file mode 100644
index ce68982..0000000
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.api.messages;
-
-import org.apache.plc4x.java.api.model.Address;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
-
-public class PlcNotification<T> {
-
- protected final Date timeStamp;
-
- protected final Address address;
-
- protected final List<T> values;
-
- public PlcNotification(Date timeStamp, Address address, List<T> values) {
- this.timeStamp = timeStamp;
- this.address = address;
- this.values = values;
- }
-
- public Date getTimeStamp() {
- return timeStamp;
- }
-
- public Address getAddress() {
- return address;
- }
-
- public List<T> getValues() {
- return values;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof PlcNotification)) {
- return false;
- }
- PlcNotification<?> that = (PlcNotification<?>) o;
- return Objects.equals(timeStamp, that.timeStamp) &&
- Objects.equals(address, that.address) &&
- Objects.equals(values, that.values);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timeStamp, address, values);
- }
-
- @Override
- public String toString() {
- return "PlcNotification{" +
- "timeStamp=" + timeStamp +
- ", address=" + address +
- ", values=" + values +
- '}';
- }
-}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
index 3ad5fc1..3ce851b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
@@ -101,7 +101,7 @@ public abstract class PlcRequest<REQUEST_ITEM extends RequestItem> implements Pl
@Override
public String toString() {
return "PlcRequest{" +
- "requestItems=" + requestItems +
+ "eventItems=" + requestItems +
'}';
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
index 4168246..f7259f6 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
@@ -21,6 +21,14 @@ package org.apache.plc4x.java.api.messages;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+/**
+ * Helper mostly used internally to split up big requests into multiple sub-requests.
+ *
+ * TODO: Think about moving this into one of the driver-base modules.
+ *
+ * @param <T>
+ * @param <R>
+ */
public class PlcRequestContainer<T extends PlcRequest, R extends PlcResponse> implements ProtocolMessage {
private final T request;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
similarity index 63%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
index e923524..e8b1d0d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
@@ -1,3 +1,9 @@
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+
+import java.util.List;
+
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +22,16 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
+public class PlcSubscriptionEvent {
-import org.apache.plc4x.java.api.types.ResponseCode;
+ protected final List<SubscriptionEventItem<?>> eventItems;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+ public PlcSubscriptionEvent(List<SubscriptionEventItem<?>> eventItems) {
+ this.eventItems = eventItems;
+ }
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+ public List<SubscriptionEventItem<?>> getEventItems() {
+ return eventItems;
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index e923524..65b0659 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +17,9 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
- }
+public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> {
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
index e923524..3c271df 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.List;
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+public class PlcSubscriptionResponse extends PlcResponse<PlcSubscriptionRequest, SubscriptionResponseItem<?>, SubscriptionRequestItem<?>> {
+
+ public PlcSubscriptionResponse(PlcSubscriptionRequest request, List<SubscriptionResponseItem<?>> subscriptionResponseItems) {
+ super(request, subscriptionResponseItems);
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
new file mode 100644
index 0000000..94377d3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class PlcUnsubscriptionRequest implements PlcMessage {
+
+ protected final List<UnsubscriptionRequestItem> requestItems;
+
+ public PlcUnsubscriptionRequest() {
+ this.requestItems = new LinkedList<>();
+ }
+
+ public PlcUnsubscriptionRequest(List<UnsubscriptionRequestItem> requestItems) {
+ Objects.requireNonNull(requestItems, "Request items must not be null");
+ this.requestItems = requestItems;
+ }
+
+ public void addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) {
+ Objects.requireNonNull(unsubscriptionRequestItem, "Request item must not be null");
+ getRequestItems().add(unsubscriptionRequestItem);
+ }
+
+ public List<UnsubscriptionRequestItem> getRequestItems() {
+ return requestItems;
+ }
+
+ public int getNumberOfItems() {
+ return getRequestItems().size();
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index e923524..f75415a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +17,6 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
-
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
- }
+public class PlcUnsubscriptionResponse implements PlcMessage {
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
index 222310e..bc77dc5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
index 9d5a4d6..f7736e8 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
index 4df0e4e..39a3ec2 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
index 34f61b1..5cb6d9b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
new file mode 100644
index 0000000..49a0200
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
@@ -0,0 +1,48 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import java.util.Calendar;
+import java.util.List;
+
+public class SubscriptionEventItem<T> {
+
+ private SubscriptionRequestItem<T> subscriptionRequestItem;
+ private Calendar timestamp;
+ private List<T> values;
+
+ public SubscriptionEventItem(SubscriptionRequestItem<T> subscriptionRequestItem, Calendar timestamp, List<T> values) {
+ this.subscriptionRequestItem = subscriptionRequestItem;
+ this.timestamp = timestamp;
+ this.values = values;
+ }
+
+ public SubscriptionRequestItem<T> getSubscriptionRequestItem() {
+ return subscriptionRequestItem;
+ }
+
+ public Calendar getTimestamp() {
+ return timestamp;
+ }
+
+ public List<T> getValues() {
+ return values;
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
similarity index 66%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index e923524..57db70f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -18,12 +18,15 @@ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem {
+
+ public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) {
+ super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
similarity index 54%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index e923524..b1a491b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -18,12 +18,29 @@ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+
+ private TimeUnit timeUnit;
+ private int period;
+
+ public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) {
+ super(datatype, address, SubscriptionType.CYCLIC, consumer);
+ this.timeUnit = timeUnit;
+ this.period = period;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public int getPeriod() {
+ return period;
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
similarity index 68%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e923524..8ed69c5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -18,12 +18,15 @@ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+
+ public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) {
+ super(datatype, address, SubscriptionType.EVENT, consumer);
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
new file mode 100644
index 0000000..7591e09
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
@@ -0,0 +1,45 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
+
+import java.util.function.Consumer;
+
+public abstract class SubscriptionRequestItem<T> extends RequestItem<T> {
+
+ private SubscriptionType subscriptionType;
+ private Consumer<SubscriptionEventItem<T>> consumer;
+
+ public SubscriptionRequestItem(Class<T> datatype, Address address, SubscriptionType subscriptionType, Consumer<SubscriptionEventItem<T>> consumer) {
+ super(datatype, address);
+ this.subscriptionType = subscriptionType;
+ this.consumer = consumer;
+ }
+
+ public SubscriptionType getSubscriptionType() {
+ return subscriptionType;
+ }
+
+ public Consumer<SubscriptionEventItem<T>> getConsumer() {
+ return consumer;
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
new file mode 100644
index 0000000..c38e28b
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
@@ -0,0 +1,37 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class SubscriptionResponseItem<T> extends ResponseItem<SubscriptionRequestItem<T>> {
+
+ private SubscriptionHandle subscriptionHandle;
+
+ public SubscriptionResponseItem(SubscriptionRequestItem<T> requestItem, SubscriptionHandle subscriptionHandle, ResponseCode responseCode) {
+ super(requestItem, responseCode);
+ this.subscriptionHandle = subscriptionHandle;
+ }
+
+ public SubscriptionHandle getSubscriptionHandle() {
+ return subscriptionHandle;
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
new file mode 100644
index 0000000..d3312f3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
@@ -0,0 +1,62 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.Objects;
+
+public class UnsubscriptionRequestItem {
+
+ private SubscriptionHandle subscriptionHandle;
+
+ public UnsubscriptionRequestItem(SubscriptionHandle subscriptionHandle) {
+ Objects.requireNonNull(subscriptionHandle, "SubscriptionHandle must not be null");
+ this.subscriptionHandle = subscriptionHandle;
+ }
+
+ public SubscriptionHandle getSubscriptionHandle() {
+ return subscriptionHandle;
+ }
+
+ @Override
+ public String toString() {
+ return "UnsubscriptionRequestItem{" +
+ "subscriptionHandle=" + subscriptionHandle +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UnsubscriptionRequestItem)) {
+ return false;
+ }
+ UnsubscriptionRequestItem that = (UnsubscriptionRequestItem) o;
+ return Objects.equals(subscriptionHandle, that.subscriptionHandle);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subscriptionHandle);
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
new file mode 100644
index 0000000..8a7c659
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
@@ -0,0 +1,29 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class UnsubscriptionResponseItem {
+
+ public UnsubscriptionResponseItem(UnsubscriptionRequestItem requestItem, ResponseCode responseCode) {
+
+ }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
index 3acbef5..9aba923 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
index e923524..9928b7d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
@@ -1,20 +1,20 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
*/
package org.apache.plc4x.java.api.messages.items;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
index e923524..445d9e9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
- }
+/**
+ * When subscribing to remote resources, depending on the used protocol
+ * different data is used to identify a subscription. This interface is
+ * to be implemented in the individual Driver implementations to contain
+ * all information needed to pull or unsubscribe any form of subscription.
+ *
+ * For every subscribed item, a separate {@link SubscriptionHandle} object is
+ * returned in order to allow fine granular unsubscriptions.
+ */
+public interface SubscriptionHandle {
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
similarity index 57%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
index e923524..287420b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,14 +17,26 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
-import org.apache.plc4x.java.api.types.ResponseCode;
+/**
+ * {@link SubscriptionType} specifies the nature of the subscription.
+ * In general PLC4X supports exactly 3 types of subscriptions.
+ */
+public enum SubscriptionType {
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+ /**
+ * A cyclic subscription where a value is sent no matter if it's value changed in a given interval.
+ */
+ CYCLIC,
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
- }
+ /**
+ * Only send data, if a value in the PLC changed.
+ */
+ CHANGE_OF_STATE,
+
+ /**
+ * Subscribe to events created by the PLC which usually are defined in the PLCs application (Alarms).
+ */
+ EVENT
}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index 2304199..cf1b8d9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -149,6 +149,8 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
}
protected void mapAddress(SymbolicAdsAddress symbolicAdsAddress) {
+ // If the map doesn't contain an entry for the given symbolicAdsAddress,
+ // resolve it and add it to the map.
addressMapping.computeIfAbsent(symbolicAdsAddress, symbolicAdsAddressInternal -> {
LOGGER.debug("Resolving {}", symbolicAdsAddressInternal);
AdsReadWriteRequest adsReadWriteRequest = AdsReadWriteRequest.of(
@@ -163,13 +165,16 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
Data.of(symbolicAdsAddressInternal.getSymbolicAddress())
);
+ // TODO: This is blocking, should be changed to be async.
CompletableFuture<PlcProprietaryResponse<AdsReadWriteResponse>> getHandelFuture = new CompletableFuture<>();
channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsReadWriteRequest), getHandelFuture));
PlcProprietaryResponse<AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
AdsReadWriteResponse response = getHandleResponse.getResponse();
+
if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
throw new PlcRuntimeException("Non error code received " + response.getResult());
}
+
IndexOffset symbolHandle = IndexOffset.of(response.getData().getBytes());
return AdsAddress.of(IndexGroup.ReservedGroups.ADSIGRP_SYM_VALBYHND.getAsLong(), symbolHandle.getAsLong());
});
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 4a04555..124d8d6 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -22,13 +22,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.ads.api.commands.*;
import org.apache.plc4x.java.ads.api.commands.types.*;
import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
import org.apache.plc4x.java.ads.api.generic.types.Invoke;
import org.apache.plc4x.java.ads.model.AdsAddress;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
@@ -37,11 +37,13 @@ import org.apache.plc4x.java.ads.protocol.util.LittleEndianDecoder;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.types.ResponseCode;
import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,10 +67,6 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
private static AtomicInteger localPorts = new AtomicInteger(30000);
- private final Map<Pair<Consumer<? extends PlcNotification>, Address>, Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle>> subscriberMap = new HashMap<>();
-
- private final Map<NotificationHandle, Consumer<? extends PlcNotification>> handleConsumerMap = new HashMap<>();
-
private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, AmsPort targetAmsPort) {
this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), generateAMSPort());
}
@@ -132,99 +130,129 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
}
@Override
- public <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType) {
- Objects.requireNonNull(consumer);
- Objects.requireNonNull(address);
- IndexGroup indexGroup;
- IndexOffset indexOffset;
- if (address instanceof SymbolicAdsAddress) {
- mapAddress((SymbolicAdsAddress) address);
- AdsAddress adsAddress = addressMapping.get(address);
- if (adsAddress == null) {
- throw new PlcRuntimeException("Unresolvable address" + address);
+ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+ // TODO: Make this multi-value
+ CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
+ if(subscriptionRequest.getNumberOfItems() == 1) {
+ SubscriptionRequestItem<?> subscriptionRequestItem = subscriptionRequest.getRequestItem().orElse(null);
+
+ Objects.requireNonNull(subscriptionRequestItem);
+ Objects.requireNonNull(subscriptionRequestItem.getConsumer());
+ Objects.requireNonNull(subscriptionRequestItem.getAddress());
+ Objects.requireNonNull(subscriptionRequestItem.getDatatype());
+
+ Address address = subscriptionRequestItem.getAddress();
+ Class<?> datatype = subscriptionRequestItem.getDatatype();
+
+ IndexGroup indexGroup;
+ IndexOffset indexOffset;
+ // If this is a symbolic address, it has to be resolved first.
+ // TODO: This is blocking, should be changed to be async.
+ if (address instanceof SymbolicAdsAddress) {
+ mapAddress((SymbolicAdsAddress) address);
+ AdsAddress adsAddress = addressMapping.get(address);
+ if (adsAddress == null) {
+ throw new PlcRuntimeException("Unresolvable address" + address);
+ }
+ indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+ indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+ }
+ // If it's no symbolic address, we can continue immediately
+ // without having to do any resolving.
+ else if (address instanceof AdsAddress) {
+ AdsAddress adsAddress = (AdsAddress) address;
+ indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+ indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+ } else {
+ throw new IllegalArgumentException("Unsupported address type " + address.getClass());
}
- indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
- indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
- } else if (address instanceof AdsAddress) {
- AdsAddress adsAddress = (AdsAddress) address;
- indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
- indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
- } else {
- throw new IllegalArgumentException("Unssuported address type " + address.getClass());
- }
- AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
- targetAmsNetId,
- targetAmsPort,
- sourceAmsNetId,
- sourceAmsPort,
- Invoke.NONE,
- indexGroup,
- indexOffset,
- LittleEndianDecoder.getLengthFor(dataType, 1),
- TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
- MaxDelay.of(0),
- CycleTime.of(4000000)
- );
-
- CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
- channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
- PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
- AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
- if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
- throw new PlcRuntimeException("Non error code received " + response.getResult());
- }
- NotificationHandle notificationHandle = response.getNotificationHandle();
- handleConsumerMap.put(notificationHandle, consumer);
-
- Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
- adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
- Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
-
- adsStampHeader.getAdsNotificationSamples()
- .forEach(adsNotificationSample -> {
- Consumer<? extends PlcNotification> plcNotificationConsumer = handleConsumerMap.get(adsNotificationSample.getNotificationHandle());
- if (plcNotificationConsumer == null) {
- LOGGER.warn("Unmapped notification received {}", adsNotificationSample);
- return;
- }
- Data data = adsNotificationSample.getData();
- try {
- @SuppressWarnings("unchecked")
- List<R> decodeData = (List<R>) LittleEndianDecoder.decodeData(dataType, data.getBytes());
- consumer.accept(new PlcNotification<>(timeStamp, address, decodeData));
- } catch (PlcProtocolException | RuntimeException e) {
- LOGGER.error("Can't decode {}", data, e);
- }
- });
- });
- subscriberMap.put(Pair.of(consumer, address), Pair.of(adsDeviceNotificationRequestConsumer, notificationHandle));
- getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
- }
- @Override
- public <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address) {
- Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle> handlePair = subscriberMap.remove(Pair.of(consumer, address));
- if (handlePair != null) {
- NotificationHandle notificationHandle = handlePair.getRight();
- AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest = AdsDeleteDeviceNotificationRequest.of(
+ // Prepare the subscription request itself.
+ AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
targetAmsNetId,
targetAmsPort,
sourceAmsNetId,
sourceAmsPort,
Invoke.NONE,
- notificationHandle
+ indexGroup,
+ indexOffset,
+ LittleEndianDecoder.getLengthFor(datatype, 1),
+ TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
+ MaxDelay.of(0),
+ CycleTime.of(4000000)
);
- CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture = new CompletableFuture<>();
- channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
- PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse = getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
- AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+ // Send the request to the plc and wait for a response
+ // TODO: This is blocking, should be changed to be async.
+ CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
+ channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
+ PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+ AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
+
+ // Abort if we got anything but a successful response.
if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
- throw new PlcRuntimeException("Non error code received " + response.getResult());
+ throw new PlcRuntimeException("Error code received " + response.getResult());
}
+ AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(response.getNotificationHandle());
+ future.complete(new PlcSubscriptionResponse(subscriptionRequest, Collections.singletonList(
+ new SubscriptionResponseItem<>(subscriptionRequestItem, adsSubscriptionHandle, ResponseCode.OK))));
+
+ Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
+ adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
+ Calendar timeStamp = Calendar.getInstance();
+ timeStamp.setTime(adsStampHeader.getTimeStamp().getAsDate());
- getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(handlePair.getLeft());
- handleConsumerMap.remove(notificationHandle);
+ adsStampHeader.getAdsNotificationSamples()
+ .forEach(adsNotificationSample -> {
+ Data data = adsNotificationSample.getData();
+ try {
+ @SuppressWarnings("unchecked")
+ List<?> decodeData = LittleEndianDecoder.decodeData(datatype, data.getBytes());
+ SubscriptionEventItem subscriptionEventItem =
+ new SubscriptionEventItem(subscriptionRequestItem, timeStamp, decodeData);
+ subscriptionRequestItem.getConsumer().accept(subscriptionEventItem);
+ } catch (PlcProtocolException | RuntimeException e) {
+ LOGGER.error("Can't decode {}", data, e);
+ }
+ });
+ });
+ // TODO: What's this for? Is this still needed if we use the consumers in the subscriptions?
+ getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+ for (UnsubscriptionRequestItem unsubscriptionRequestItem : unsubscriptionRequest.getRequestItems()) {
+ Objects.requireNonNull(unsubscriptionRequestItem);
+ if(unsubscriptionRequestItem.getSubscriptionHandle() instanceof AdsSubscriptionHandle) {
+ AdsSubscriptionHandle adsSubscriptionHandle =
+ (AdsSubscriptionHandle) unsubscriptionRequestItem.getSubscriptionHandle();
+ AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest =
+ AdsDeleteDeviceNotificationRequest.of(
+ targetAmsNetId,
+ targetAmsPort,
+ sourceAmsNetId,
+ sourceAmsPort,
+ Invoke.NONE,
+ adsSubscriptionHandle.getNotificationHandle()
+ );
+ CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
+ new CompletableFuture<>();
+ channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(
+ adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
+
+ PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
+ getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
+ AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+ if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
+ throw new PlcRuntimeException("Non error code received " + response.getResult());
+ }
+ }
}
+ CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
+ future.complete(new PlcUnsubscriptionResponse());
+ return future;
}
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index e923524..2850fa9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -16,14 +16,21 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.plc4x.java.api.messages.items;
+package org.apache.plc4x.java.ads.model;
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+public class AdsSubscriptionHandle implements SubscriptionHandle {
- public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
- super(requestItem, responseCode);
+ private NotificationHandle notificationHandle;
+
+ public AdsSubscriptionHandle(NotificationHandle notificationHandle) {
+ this.notificationHandle = notificationHandle;
+ }
+
+ public NotificationHandle getNotificationHandle() {
+ return notificationHandle;
}
}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 7d8566e..3511e32 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,8 +22,8 @@ import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.*;
import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
import org.apache.plc4x.java.api.model.Address;
@@ -41,7 +41,7 @@ public class ManualPlc4XAdsTest {
connectionUrl = "ads:serial:///dev/ttys003/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
} else {
System.out.println("Using tcp");
- connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
+ connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.1.1.1:30000";
}
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
System.out.println("PlcConnection " + plcConnection);
@@ -57,11 +57,21 @@ public class ManualPlc4XAdsTest {
System.out.println("ResponseItem " + responseItem);
responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println);
- Consumer<PlcNotification<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
+ Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
- plcSubscriber.subscribe(notificationConsumer, address, Integer.class);
- TimeUnit.SECONDS.sleep(5);
- plcSubscriber.unsubscribe(notificationConsumer, address);
+ PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+ subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer));
+ CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest);
+ PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+ SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get();
+
+ PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+ unsubscriptionRequest.addItem(
+ new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+ CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
+ plcSubscriber.unsubscribe(unsubscriptionRequest);
+ PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+ System.out.println(unsubscriptionResponse);
}
System.exit(0);
}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
index a92a65e..74fbca0 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
@@ -29,10 +29,11 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
import org.apache.plc4x.java.ads.model.AdsAddress;
import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestChangeOfStateItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,9 +50,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import static org.hamcrest.core.IsNull.notNullValue;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
+import static org.hamcrest.core.IsEqual.equalTo;
public class AdsTcpPlcConnectionTests {
@@ -66,6 +69,7 @@ public class AdsTcpPlcConnectionTests {
@Before
public void setUp() throws Exception {
SUT = AdsTcpPlcConnection.of(InetAddress.getByName("localhost"), AmsNetId.of("0.0.0.0.0.0"), AmsPort.of(13));
+ // TODO: Refactor this to use the TestChannelFactory instead.
channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
FieldUtils.writeField(SUT, "channel", channelMock, true);
executorService = Executors.newFixedThreadPool(10);
@@ -115,6 +119,7 @@ public class AdsTcpPlcConnectionTests {
@Test
public void subscribe() throws Exception {
+ // TODO: Does this really test the driver implementation?
when(channelMock.writeAndFlush(any(PlcRequestContainer.class)))
.then(invocationOnMock -> {
PlcRequestContainer plcRequestContainer = invocationOnMock.getArgument(0);
@@ -158,19 +163,30 @@ public class AdsTcpPlcConnectionTests {
when(channelMock.pipeline().get(Plc4x2AdsProtocol.class)).thenReturn(plc4x2AdsProtocol);
CompletableFuture<?> notificationReceived = new CompletableFuture<>();
- Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
+ Consumer<SubscriptionEventItem<String>> plcNotificationConsumer = plcNotification -> {
LOGGER.info("Received {}", plcNotification);
notificationReceived.complete(null);
};
- SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("0/0"), String.class);
- SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("Main.by[0]"), String.class);
- notificationReceived.get(3, TimeUnit.SECONDS);
- }
-
- @Test
- public void unsubscribe() {
- Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
- };
- SUT.unsubscribe(plcNotificationConsumer, SUT.parseAddress("0/0"));
+ PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+ subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(
+ String.class, SUT.parseAddress("0/0"), plcNotificationConsumer));
+ /*subscriptionRequest.addItem(new SubscriptionRequestItem<>(
+ String.class, SUT.parseAddress("Main.by[0]"), plcNotificationConsumer));*/
+ CompletableFuture<? extends PlcSubscriptionResponse> subscriptionFuture = SUT.subscribe(subscriptionRequest);
+ PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+ //notificationReceived.get(3, TimeUnit.SECONDS);
+ assertThat(subscriptionResponse, notNullValue());
+ assertThat(subscriptionResponse.getNumberOfItems(), equalTo(1));
+
+ // Now unsubscribe again ...
+
+ // TODO: Setup the mock to actually perform the unsubscription.
+ /*PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+ for (SubscriptionResponseItem<?> subscriptionResponseItem : subscriptionResponse.getResponseItems()) {
+ unsubscriptionRequest.addItem(subscriptionResponseItem.getSubscriptionHandle());
+ }
+ CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = SUT.unsubscribe(unsubscriptionRequest);
+ PlcUnsubscriptionResponse plcUnsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+ assertThat(plcUnsubscriptionResponse, notNullValue());*/
}
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e2796f4..ab4629a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,6 +316,9 @@
<!-- Jenkins build related files -->
<exclude>.repository/**</exclude>
+
+ <!-- Data files created by examples running an embedded elasticsearch -->
+ <exclude>elasticsearch-data/**</exclude>
</excludes>
</configuration>
</plugin>