You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/08/16 08:35:22 UTC
[incubator-plc4x] branch master updated: Introduced build for
PlcSubscriptionRequest and PlcUnsubscriptionRequest + generified items for
subscription + reordered SubscriptionRequestCyclicItem so that consumer is
the last parameter. + adjusted manual test
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push:
new d04b40d Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest + generified items for subscription + reordered SubscriptionRequestCyclicItem so that consumer is the last parameter. + adjusted manual test
d04b40d is described below
commit d04b40dd50d401f06ccc7284b1b388044bfa7214
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 16 10:35:17 2018 +0200
Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest
+ generified items for subscription
+ reordered SubscriptionRequestCyclicItem so that consumer is the last parameter.
+ adjusted manual test
---
.../java/org/apache/plc4x/camel/Plc4XConsumer.java | 9 +++-
.../java/api/messages/PlcSubscriptionRequest.java | 51 ++++++++++++++++++-
.../api/messages/PlcUnsubscriptionRequest.java | 57 ++++++++++++++++++++++
.../SubscriptionRequestChangeOfStateItem.java | 4 +-
.../items/SubscriptionRequestCyclicItem.java | 6 +--
.../items/SubscriptionRequestEventItem.java | 6 +--
.../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 36 ++++++++------
7 files changed, 143 insertions(+), 26 deletions(-)
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 16fc626..e04e016 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -29,7 +29,10 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestCyclicItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
import org.apache.plc4x.java.api.model.Address;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +86,9 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
@Override
protected void doStart() throws InterruptedException, ExecutionException, TimeoutException {
PlcSubscriptionRequest request = new PlcSubscriptionRequest();
- request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3));
+ @SuppressWarnings("unchecked")
+ SubscriptionRequestCyclicItem subscriptionRequestCyclicItem = new SubscriptionRequestCyclicItem(dataType, address, TimeUnit.SECONDS, 3, this);
+ request.addItem(subscriptionRequestCyclicItem);
CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request);
subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 5fa8ac4..67c4894 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -18,10 +18,59 @@ specific language governing permissions and limitations
under the License.
*/
-import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.model.Address;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> {
+ public static PlcSubscriptionRequest.Builder builder() {
+ return new PlcSubscriptionRequest.Builder();
+ }
+
+ public static class Builder extends PlcRequest.Builder<SubscriptionRequestItem> {
+
+ public final <T> Builder addChangeOfStateItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+ // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+ //checkType(dataType);
+ requests.add(new SubscriptionRequestChangeOfStateItem<>(dataType, address, consumer));
+ return this;
+ }
+
+ public final <T> Builder addCyclicItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer, TimeUnit timeUnit, int period) {
+ // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+ //checkType(dataType);
+ requests.add(new SubscriptionRequestCyclicItem<>(dataType, address, timeUnit, period, consumer));
+ return this;
+ }
+
+ public final <T> Builder addEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+ // As we don't get a list as response rather we have individual consumers we don't need type checking here.
+ //checkType(dataType);
+ requests.add(new SubscriptionRequestEventItem<>(dataType, address, consumer));
+ return this;
+ }
+
+ public final Builder addItem(SubscriptionRequestItem subscriptionRequestItem) {
+ requests.add(subscriptionRequestItem);
+ return this;
+ }
+
+ public final PlcSubscriptionRequest build() {
+ if (requests.isEmpty()) {
+ throw new IllegalStateException("No requests added");
+ }
+ PlcSubscriptionRequest plcSubscriptionRequest = new PlcSubscriptionRequest();
+ for (SubscriptionRequestItem request : requests) {
+ plcSubscriptionRequest.addItem(request);
+ }
+ return plcSubscriptionRequest;
+ }
+
+ }
+
@Override
public String toString() {
return "PlcSubscriptionRequest{} " + super.toString();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index ed12177..bc46db7 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -18,12 +18,15 @@ under the License.
*/
package org.apache.plc4x.java.api.messages;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
public class PlcUnsubscriptionRequest implements PlcMessage {
@@ -51,6 +54,60 @@ public class PlcUnsubscriptionRequest implements PlcMessage {
return getRequestItems().size();
}
+ public static PlcUnsubscriptionRequest.Builder builder() {
+ return new PlcUnsubscriptionRequest.Builder();
+ }
+
+ public static class Builder extends PlcRequest.Builder<UnsubscriptionRequestItem> {
+
+ public final Builder addHandle(SubscriptionHandle subscriptionHandle) {
+ requests.add(new UnsubscriptionRequestItem(subscriptionHandle));
+ return this;
+ }
+
+ public final Builder addHandle(SubscriptionHandle... subscriptionHandles) {
+ requests.addAll(Arrays.stream(subscriptionHandles).map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+ return this;
+ }
+
+ public final Builder addHandle(List<SubscriptionHandle> subscriptionHandles) {
+ requests.addAll(subscriptionHandles.stream().map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+ return this;
+ }
+
+ public final Builder addHandle(SubscriptionResponseItem subscriptionResponseItem) {
+ requests.add(new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+ return this;
+ }
+
+ public final Builder addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) {
+ requests.add(unsubscriptionRequestItem);
+ return this;
+ }
+
+ public final Builder addItem(UnsubscriptionRequestItem... unsubscriptionRequestItems) {
+ requests.addAll(Arrays.asList(unsubscriptionRequestItems));
+ return this;
+ }
+
+ public final Builder addItem(List<UnsubscriptionRequestItem> unsubscriptionRequestItems) {
+ requests.addAll(unsubscriptionRequestItems);
+ return this;
+ }
+
+ public final PlcUnsubscriptionRequest build() {
+ if (requests.isEmpty()) {
+ throw new IllegalStateException("No requests added");
+ }
+ PlcUnsubscriptionRequest plcUnsubscriptionRequest = new PlcUnsubscriptionRequest();
+ for (UnsubscriptionRequestItem request : requests) {
+ plcUnsubscriptionRequest.addItem(request);
+ }
+ return plcUnsubscriptionRequest;
+ }
+
+ }
+
@Override
public String toString() {
return "PlcUnsubscriptionRequest{" +
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index 8e4b0eb..aa46b3a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -23,9 +23,9 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
import java.util.function.Consumer;
-public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem {
+public class SubscriptionRequestChangeOfStateItem<T> extends SubscriptionRequestItem<T> {
- public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) {
+ public SubscriptionRequestChangeOfStateItem(Class<T> datatype, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index 22793b2..06f3a77 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -25,13 +25,13 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+public class SubscriptionRequestCyclicItem<T> extends SubscriptionRequestItem<T> {
private TimeUnit timeUnit;
private int period;
- public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) {
- super(datatype, address, SubscriptionType.CYCLIC, consumer);
+ public SubscriptionRequestCyclicItem(Class<T> dataType, Address address, TimeUnit timeUnit, int period, Consumer<SubscriptionEventItem<T>> consumer) {
+ super(dataType, address, SubscriptionType.CYCLIC, consumer);
this.timeUnit = timeUnit;
this.period = period;
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e842d88..1abd1b4 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
import java.util.function.Consumer;
-public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+public class SubscriptionRequestEventItem<T> extends SubscriptionRequestItem<T> {
- public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) {
- super(datatype, address, SubscriptionType.EVENT, consumer);
+ public SubscriptionRequestEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+ super(dataType, address, SubscriptionType.EVENT, consumer);
}
@Override
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 3511e32..a0e28c8 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,15 +22,17 @@ import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
import org.apache.plc4x.java.api.model.Address;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
public class ManualPlc4XAdsTest {
@@ -57,20 +59,24 @@ public class ManualPlc4XAdsTest {
System.out.println("ResponseItem " + responseItem);
responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println);
- Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
- PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
- subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer));
- CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest);
- PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
- SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get();
- PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
- unsubscriptionRequest.addItem(
- new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
- CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
- plcSubscriber.unsubscribe(unsubscriptionRequest);
- PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+ PlcSubscriptionRequest subscriptionRequest = PlcSubscriptionRequest.builder()
+ .addChangeOfStateItem(Integer.class, address, plcNotification -> System.out.println("Received notification " + plcNotification))
+ .build();
+
+ SubscriptionResponseItem subscriptionResponseItem = plcSubscriber.subscribe(subscriptionRequest)
+ .get(5, TimeUnit.SECONDS)
+ .getResponseItem().orElseThrow(() -> new RuntimeException("response not available"));
+
+ TimeUnit.SECONDS.sleep(5);
+
+ PlcUnsubscriptionRequest unsubscriptionRequest = PlcUnsubscriptionRequest.builder()
+ .addHandle(subscriptionResponseItem)
+ .build();
+
+ PlcUnsubscriptionResponse unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest)
+ .get(5, TimeUnit.SECONDS);
System.out.println(unsubscriptionResponse);
}
System.exit(0);