You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:18 UTC
[incubator-plc4x] 11/19: adjusted connectors and examples to the
new api
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit eb079f92a09404df93545c436cc2ff5d71353918
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Oct 4 15:54:51 2018 +0200
adjusted connectors and examples to the new api
---
.../azure/iothub/S7PlcToAzureIoTHubSample.java | 5 +---
.../google/iotcore/S7PlcToGoogleIoTCoreSample.java | 5 +---
.../plc4x/java/examples/helloplc4x/HelloPlc4x.java | 22 +++++++-------
.../java/org/apache/plc4x/camel/Plc4XConsumer.java | 17 +++--------
.../apache/plc4x/camel/Plc4XPollingConsumer.java | 14 +++++----
.../java/org/apache/plc4x/camel/Plc4XProducer.java | 4 +--
.../java/org/apache/plc4x/camel/MockDriver.java | 11 ++-----
.../org/apache/plc4x/camel/Plc4XProducerTest.java | 3 --
.../apache/plc4x/edgent/PlcConnectionAdapter.java | 14 ++-------
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 9 +++---
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 23 +++++++--------
.../org/apache/plc4x/nifi/Plc4xSinkProcessor.java | 8 ++---
.../apache/plc4x/nifi/Plc4xSourceProcessor.java | 24 +++++++--------
.../plc4x/java/api/connection/PlcConnection.java | 8 -----
.../plc4x/java/api/connection/PlcReader.java | 2 --
.../plc4x/java/api/connection/PlcSubscriber.java | 4 ---
.../plc4x/java/api/connection/PlcWriter.java | 2 --
.../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 13 ++++-----
.../base/connection/AbstractPlcConnection.java | 28 ------------------
.../java/ethernetip/ManualPlc4XEtherNetIpTest.java | 8 ++---
.../plc4x/java/modbus/ManualPlc4XModbusTest.java | 34 +++++++++-------------
.../org/apache/plc4x/java/test/TestConnection.java | 16 ----------
22 files changed, 83 insertions(+), 191 deletions(-)
diff --git a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
index a4f80cc..45ac6c6 100644
--- a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
+++ b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
@@ -23,7 +23,6 @@ import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.Message;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.slf4j.Logger;
@@ -62,15 +61,13 @@ public class S7PlcToAzureIoTHubSample {
DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT);
client.open();
- // Get a reader instance.
- PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new);
// Prepare a read request.
PlcReadRequest request = plcConnection.readRequestBuilder().get().addItem(FIELD_NAME, addressString).build();
while (!Thread.currentThread().isInterrupted()) {
// Simulate telemetry.
- PlcReadResponse response = plcReader.read(request).get();
+ PlcReadResponse response = request.execute().get();
response.getAllLongs(FIELD_NAME)
.forEach(longValue -> {
String result = Long.toBinaryString(longValue);
diff --git a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
index fba969c..c6aac9b 100644
--- a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
+++ b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
@@ -23,7 +23,6 @@ import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.eclipse.paho.client.mqttv3.*;
@@ -233,13 +232,11 @@ public class S7PlcToGoogleIoTCoreSample {
try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) {
logger.info("Connected");
- PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalAccessError::new);
-
PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("outputs", "OUTPUTS/0").build();
while (!Thread.currentThread().isInterrupted()) {
- PlcReadResponse plcReadResponse = plcReader.read(readRequest).get();
+ PlcReadResponse plcReadResponse = readRequest.execute().get();
// Refresh the connection credentials before the JWT expires.
// [START iot_mqtt_jwt_refresh]
diff --git a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 402c856..05190e9 100644
--- a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -20,14 +20,12 @@ package org.apache.plc4x.java.examples.helloplc4x;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class HelloPlc4x {
@@ -49,34 +47,36 @@ public class HelloPlc4x {
// Establish a connection to the plc using the url provided as first argument
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(args[0])) {
- Optional<PlcReader> reader = plcConnection.getReader();
-
// Check if this connection support reading of data.
- if (reader.isPresent()) {
- PlcReader plcReader = reader.get();
+ if (plcConnection.readRequestBuilder().isPresent()) {
// Create a new read request:
// - Give the single item requested the alias name "value"
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
+ PlcReadRequest.Builder syncBuilder = plcConnection.readRequestBuilder().get();
for (int i = 1; i < args.length; i++) {
- builder.addItem("value-" + i, args[i]);
+ syncBuilder.addItem("value-" + i, args[i]);
}
- PlcReadRequest plcReadRequest = builder.build();
+ PlcReadRequest syncPlcReadRequest = syncBuilder.build();
//////////////////////////////////////////////////////////
// Read synchronously ...
// NOTICE: the ".get()" immediately lets this thread pause till
// the response is processed and available.
System.out.println("\nSynchronous request ...");
- PlcReadResponse syncResponse = plcReader.read(plcReadRequest).get();
+ PlcReadResponse syncResponse = syncPlcReadRequest.execute().get();
// Simply iterating over the field names returned in the response.
printResponse(syncResponse);
//////////////////////////////////////////////////////////
// Read asynchronously ...
// Register a callback executed as soon as a response arives.
+ PlcReadRequest.Builder asyncBuilder = plcConnection.readRequestBuilder().get();
+ for (int i = 1; i < args.length; i++) {
+ asyncBuilder.addItem("value-" + i, args[i]);
+ }
+ PlcReadRequest asyncPlcReadRequest = asyncBuilder.build();
System.out.println("\n\nAsynchronous request ...");
- CompletableFuture<PlcReadResponse> asyncResponse = plcReader.read(plcReadRequest);
+ CompletableFuture<? extends PlcReadResponse> asyncResponse = asyncPlcReadRequest.execute();
asyncResponse.whenComplete((readResponse, throwable) -> {
if (readResponse != null) {
printResponse(syncResponse);
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 9ec281c..e2f983c 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -24,7 +24,6 @@ import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.*;
import org.slf4j.Logger;
@@ -49,7 +48,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
private Class<?> dataType;
private PlcSubscriptionResponse subscriptionResponse;
-
public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
this.endpoint = endpoint;
this.dataType = endpoint.getDataType();
@@ -80,22 +78,19 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
@Override
protected void doStart() throws InterruptedException, ExecutionException, PlcException {
- PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
- () -> new PlcException("Connection doesn't support subscriptions."));
// TODO: Is it correct to only support one field?
PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder().get()
.addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build();
- PlcSubscriptionResponse plcSubscriptionResponse = plcSubscriber.subscribe(request).get();
+ subscriptionResponse = request.execute().get();
// TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
- plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
+ // TODO: figure out what to do with this
+ // plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
}
@Override
protected void doStop() throws InterruptedException, ExecutionException, TimeoutException, PlcException {
- PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
- () -> new PlcException("Connection doesn't support subscriptions."));
PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().get().addHandles(subscriptionResponse.getSubscriptionHandles()).build();
- CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = plcSubscriber.unsubscribe(request);
+ CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute();
PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
// TODO: Handle the response ...
try {
@@ -105,10 +100,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
}
}
- private PlcSubscriber getSubscriber() {
- return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available"));
- }
-
@Override
public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
LOGGER.debug("Received {}", plcSubscriptionEvent);
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index 4aee7ad..3e90b44 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@ -44,8 +44,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
private Plc4XEndpoint endpoint;
private ExceptionHandler exceptionHandler;
private PlcConnection plcConnection;
- private PlcReader plcReader;
- private PlcReadRequest readRequest;
+ private PlcReadRequest.Builder requestBuilder;
private Class dataType;
public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
@@ -54,8 +53,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
- this.plcReader = plcConnection.getReader().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
- readRequest = plcConnection.readRequestBuilder().get().addItem("default", endpoint.getAddress()).build();
+ this.requestBuilder = plcConnection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
}
@Override
@@ -79,7 +77,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
@Override
public Exchange receive() {
Exchange exchange = endpoint.createExchange();
- CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+ CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
try {
PlcReadResponse plcReadResponse = read.get();
exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -97,7 +95,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
@Override
public Exchange receive(long timeout) {
Exchange exchange = endpoint.createExchange();
- CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+ CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
try {
PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS);
exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -121,6 +119,10 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
}
}
+ private PlcReadRequest createReadRequest() {
+ return requestBuilder.addItem("default", endpoint.getAddress()).build();
+ }
+
private Object unwrapIfSingle(Collection collection) {
if (collection.isEmpty()) {
return null;
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index b4adbcc..f43e603 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@ -23,7 +23,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
@@ -60,9 +59,8 @@ public class Plc4XProducer extends DefaultAsyncProducer {
Object value = in.getBody(Object.class);
// builder.addItem(fieldName, fieldQuery, value);
}
- PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
- CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriter.write(builder.build());
+ CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
int currentlyOpenRequests = openRequests.incrementAndGet();
try {
log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index 9a3ea9e..2be45b2 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -21,13 +21,8 @@ package org.apache.plc4x.camel;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.base.messages.DefaultPlcSubscriptionResponse;
import org.apache.plc4x.java.base.messages.InternalPlcSubscriptionRequest;
import org.slf4j.Logger;
@@ -61,11 +56,10 @@ public class MockDriver implements PlcDriver {
public PlcConnection connect(String url) {
// Mock a connection.
PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
- when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
- when(plcConnectionMock.getReader()).thenReturn(Optional.of(mock(PlcReader.class, RETURNS_DEEP_STUBS)));
when(plcConnectionMock.readRequestBuilder()).thenReturn(Optional.of(mock(PlcReadRequest.Builder.class, RETURNS_DEEP_STUBS)));
when(plcConnectionMock.writeRequestBuilder()).thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
when(plcConnectionMock.subscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcSubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
+ when(plcConnectionMock.unsubscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcUnsubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
// Mock a typical subscriber.
PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
@@ -97,7 +91,6 @@ public class MockDriver implements PlcDriver {
responseFuture.complete(response);
return responseFuture;
});
- when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
return plcConnectionMock;
}
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
index 60706d6..0068a11 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
@@ -48,9 +48,6 @@ public class Plc4XProducerTest {
when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS);
- when(plcDriverManagerMock.getConnection(anyString()).getWriter())
- .thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
-
when(plcDriverManagerMock.getConnection(anyString()).writeRequestBuilder())
.thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
index 4e1fbcd..e3e23cc 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
@@ -24,8 +24,6 @@ import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -117,9 +115,7 @@ public class PlcConnectionAdapter implements AutoCloseable {
PlcConnection connection = null;
try {
connection = getConnection();
- PlcReader reader = connection.getReader()
- .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
- return reader.read(readRequest).get();
+ return readRequest.execute().get();
} catch (Exception e) {
logger.error("reading from plc device {} {} failed", connection, readRequest, e);
return null;
@@ -154,10 +150,8 @@ public class PlcConnectionAdapter implements AutoCloseable {
PlcField field = null;
try {
connection = getConnection();
- PlcReader reader = connection.getReader()
- .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
PlcReadRequest readRequest = connection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading")).addItem(FIELD_NAME, fieldQuery).build();
- PlcReadResponse readResponse = reader.read(readRequest).get();
+ PlcReadResponse readResponse = readRequest.execute().get();
Object value = null;
switch (clientDatatype) {
case BYTE:
@@ -219,12 +213,10 @@ public class PlcConnectionAdapter implements AutoCloseable {
PlcConnection connection = null;
try {
connection = getConnection();
- PlcWriter writer = connection.getWriter()
- .orElseThrow(() -> new PlcException("This connection doesn't support writing"));
PlcWriteRequest.Builder builder = connection.writeRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support writing"));
PlcWriteRequest writeRequest = builder.build();
addItem(builder, clientDatatype, fieldQuery, fieldValue);
- writer.write(writeRequest).get();
+ writeRequest.execute().get();
} catch (Exception e) {
logger.error("writing to plc device {} {} failed", connection, fieldQuery, e);
}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 6829294..7459ac5 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.kafka.util.VersionUtil;
@@ -37,7 +36,6 @@ public class Plc4xSinkTask extends SinkTask {
private String url;
private PlcConnection plcConnection;
- private PlcWriter plcWriter;
@Override
public String version() {
@@ -51,8 +49,9 @@ public class Plc4xSinkTask extends SinkTask {
openConnection();
- plcWriter = plcConnection.getWriter()
- .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+ if (!plcConnection.writeRequestBuilder().isPresent()) {
+ throw new ConnectException("Writing not supported on this connection");
+ }
}
@Override
@@ -107,7 +106,7 @@ public class Plc4xSinkTask extends SinkTask {
private void doWrite(PlcWriteRequest request) {
try {
- plcWriter.write(request).get();
+ request.execute().get();
} catch (ExecutionException | InterruptedException e) {
throw new ConnectException("Caught exception during write", e);
}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 08b3ec1..7a2b21b 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -61,8 +60,6 @@ public class Plc4xSourceTask extends SourceTask {
private List<String> queries;
private PlcConnection plcConnection;
- private PlcReader plcReader;
- private PlcReadRequest plcRequest;
// TODO: should we use shared (static) thread pool for this?
private ScheduledExecutorService scheduler;
@@ -82,15 +79,9 @@ public class Plc4xSourceTask extends SourceTask {
openConnection();
- plcReader = plcConnection.getReader()
- .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
-
-
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
- for (String query : queries) {
- builder.addItem(query, query);
+ if (!plcConnection.readRequestBuilder().isPresent()) {
+ throw new ConnectException("Reading not supported on this connection");
}
- plcRequest = builder.build();
int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
scheduler = Executors.newScheduledThreadPool(1);
@@ -154,7 +145,7 @@ public class Plc4xSourceTask extends SourceTask {
}
private List<SourceRecord> doFetch() throws InterruptedException {
- final CompletableFuture<PlcReadResponse> response = plcReader.read(plcRequest);
+ final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute();
try {
final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
return extractValues(received);
@@ -165,6 +156,14 @@ public class Plc4xSourceTask extends SourceTask {
}
}
+ private PlcReadRequest createReadRequest() {
+ PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
+ for (String query : queries) {
+ builder.addItem(query, query);
+ }
+ return builder.build();
+ }
+
private List<SourceRecord> extractValues(PlcReadResponse response) {
final List<SourceRecord> result = new LinkedList<>();
for (String query : queries) {
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 973855b..a72ea23 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -29,7 +29,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
@@ -53,8 +52,9 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
// Get an instance of a component able to write to a PLC.
PlcConnection connection = getConnection();
- PlcWriter writer = connection.getWriter().orElseThrow(
- () -> new ProcessException("Writing not supported by connection"));
+ if (!connection.writeRequestBuilder().isPresent()) {
+ throw new ProcessException("Writing not supported by connection");
+ }
// Prepare the request.
PlcWriteRequest.Builder builder = connection.writeRequestBuilder().get();
@@ -67,7 +67,7 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
PlcWriteRequest writeRequest = builder.build();
// Send the request to the PLC.
- CompletableFuture<PlcWriteResponse> future = writer.write(writeRequest);
+ CompletableFuture<? extends PlcWriteResponse> future = writeRequest.execute();
future.whenComplete((response, throwable) -> {
if (throwable != null) {
session.transfer(session.create(), FAILURE);
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 190a00d..7254e90 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.json.simple.JSONObject;
@@ -47,23 +46,24 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Get an instance of a component able to read from a PLC.
PlcConnection connection = getConnection();
- PlcReader reader = connection.getReader().orElseThrow(
- () -> new ProcessException("Writing not supported by connection"));
// Prepare the request.
- PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
- getFields().forEach(field -> {
- String address = getAddress(field);
- if(address != null) {
- builder.addItem(field, address);
- }
- });
- PlcReadRequest readRequest = builder.build();
+ if (!connection.readRequestBuilder().isPresent()) {
+ throw new ProcessException("Writing not supported by connection");
+ }
FlowFile flowFile = session.create();
session.append(flowFile, out -> {
try {
- PlcReadResponse response = reader.read(readRequest).get();
+ PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
+ getFields().forEach(field -> {
+ String address = getAddress(field);
+ if(address != null) {
+ builder.addItem(field, address);
+ }
+ });
+ PlcReadRequest readRequest = builder.build();
+ PlcReadResponse response = readRequest.execute().get();
JSONObject obj = new JSONObject();
for (String fieldName : response.getFieldNames()) {
for(int i = 0; i < response.getNumberOfValues(fieldName); i++) {
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
index 66a3778..a6da03d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
@@ -65,12 +65,4 @@ public interface PlcConnection extends AutoCloseable {
Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder();
- // the following methods should be moved to the SPI
-
- Optional<PlcReader> getReader();
-
- Optional<PlcWriter> getWriter();
-
- Optional<PlcSubscriber> getSubscriber();
-
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
index 6d50da0..133868d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
@@ -37,6 +37,4 @@ public interface PlcReader {
*/
CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest);
- //PlcReadRequest.Builder readRequestBuilder();
-
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index d1c97ba..4a3f6cc 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -62,8 +62,4 @@ public interface PlcSubscriber {
*/
void unregister(PlcConsumerRegistration registration);
- //PlcSubscriptionRequest.Builder subscriptionRequestBuilder();
-
- //PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder();
-
}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
index a4e44bf..db52de6 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
@@ -37,6 +37,4 @@ public interface PlcWriter {
*/
CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest);
- //PlcWriteRequest.Builder writeRequestBuilder();
-
}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 6e99861..283663a 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -43,22 +43,19 @@ public class ManualPlc4XAdsTest {
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
System.out.println("PlcConnection " + plcConnection);
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build();
- CompletableFuture<PlcReadResponse> response = reader.read(readRequest);
+ CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
PlcReadResponse readResponse = response.get();
System.out.println("Response " + readResponse);
Collection<Integer> stations = readResponse.getAllIntegers("station");
stations.forEach(System.out::println);
- PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-
PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
- CompletableFuture<PlcSubscriptionResponse> subscribeResponse = plcSubscriber.subscribe(subscriptionRequest);
+ CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute();
PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get();
- PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
+ // TODO: figure out what to do with this
+ /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
TimeUnit.SECONDS.sleep(5);
@@ -68,7 +65,7 @@ public class ManualPlc4XAdsTest {
unsubscriptionResponse
.get(5, TimeUnit.SECONDS);
- System.out.println(unsubscriptionResponse);
+ System.out.println(unsubscriptionResponse);*/
}
System.exit(0);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
index 8dcedd8..28be93f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
@@ -23,14 +23,10 @@ import io.netty.channel.ChannelHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -115,30 +111,6 @@ public abstract class AbstractPlcConnection implements PlcConnection {
// Implemented in sub-classes, if needed.
}
- @Override
- public Optional<PlcReader> getReader() {
- if (this instanceof PlcReader) {
- return Optional.of((PlcReader) this);
- }
- return Optional.empty();
- }
-
- @Override
- public Optional<PlcWriter> getWriter() {
- if (this instanceof PlcWriter) {
- return Optional.of((PlcWriter) this);
- }
- return Optional.empty();
- }
-
- @Override
- public Optional<PlcSubscriber> getSubscriber() {
- if (this instanceof PlcSubscriber) {
- return Optional.of((PlcSubscriber) this);
- }
- return Optional.empty();
- }
-
/**
* Can be used to check and cast a parameter to its required internal type (can be used for general type checking too).
*
diff --git a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
index 949eaeb..95cc511 100644
--- a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
+++ b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.java.ethernetip;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -34,14 +33,11 @@ public class ManualPlc4XEtherNetIpTest {
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
System.out.println("PlcConnection " + plcConnection);
- // Get a reader instance.
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get()
+ PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("Reading not supported"))
.addItem("field", "#4#105#3").build();
// Execute the read operation.
- CompletableFuture<PlcReadResponse> response = reader.read(readRequest);
+ CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
PlcReadResponse readResponse = response.get();
// Output the response.
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
index 3f6c786..910942a 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
@@ -21,8 +21,6 @@ package org.apache.plc4x.java.modbus;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
@@ -50,10 +48,9 @@ public class ManualPlc4XModbusTest {
System.out.println("PlcConnection " + plcConnection);
{
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:7[3]").build();
- PlcReadResponse readResponse = reader.read(readRequest).get();
+ PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+ .addItem("randomRegister", "register:7[3]").build();
+ PlcReadResponse readResponse = readRequest.execute().get();
System.out.println("Response " + readResponse);
readResponse.getAllByteArrays("randomRegister").stream()
.map(HexUtil::toHex)
@@ -63,11 +60,11 @@ public class ManualPlc4XModbusTest {
{
// Read an int from 2 registers
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
// Just dump the actual values
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:3[2]").build();
- PlcReadResponse readResponse = reader.read(readRequest).get();
+ PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+ .addItem("randomRegister", "register:3[2]").build();
+ PlcReadResponse readResponse = readRequest.execute().get();
System.out.println("Response " + readResponse);
Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister");
randomRegisters.stream()
@@ -87,17 +84,16 @@ public class ManualPlc4XModbusTest {
{
// Read an int from 2 registers and multiple requests
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
// Just dump the actual values
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get()
+ PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
.addItem("randomRegister1", "register:1[2]")
.addItem("randomRegister2", "register:10[3]")
.addItem("randomRegister3", "register:20[4]")
.addItem("randomRegister4", "register:30[5]")
.addItem("randomRegister5", "register:40[6]")
.build();
- PlcReadResponse readResponse = reader.read(readRequest).get();
+ PlcReadResponse readResponse = readRequest.execute().get();
System.out.println("Response " + readResponse);
IntStream.range(1, 6).forEach(i -> {
Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister" + i);
@@ -118,10 +114,9 @@ public class ManualPlc4XModbusTest {
}
{
- PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
- PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomCoil", "coil:1[9]").build();
- PlcReadResponse readResponse = reader.read(readRequest).get();
+ PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+ .addItem("randomCoil", "coil:1[9]").build();
+ PlcReadResponse readResponse = readRequest.execute().get();
System.out.println("Response " + readResponse);
readResponse.getAllBooleans("randomCoil").stream()
.map(hex -> "Coil Value: " + hex)
@@ -129,10 +124,9 @@ public class ManualPlc4XModbusTest {
}
{
- PlcWriter writer = plcConnection.getWriter().orElseThrow(() -> new RuntimeException("No Writer found"));
-
- PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().get().addItem("randomCoilField", "coil:1", true).build();
- PlcWriteResponse writeResponse = writer.write(writeRequest).get();
+ PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().orElseThrow(() -> new RuntimeException("No Writer found"))
+ .addItem("randomCoilField", "coil:1", true).build();
+ PlcWriteResponse writeResponse = writeRequest.execute().get();
System.out.println("Response " + writeResponse);
}
} catch (Exception e) {
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
index 68b1ba3..4c5764b 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -62,21 +61,6 @@ class TestConnection implements PlcConnection, PlcReader, PlcWriter {
}
@Override
- public Optional<PlcReader> getReader() {
- return Optional.of(this);
- }
-
- @Override
- public Optional<PlcWriter> getWriter() {
- return Optional.of(this);
- }
-
- @Override
- public Optional<PlcSubscriber> getSubscriber() {
- return Optional.empty(); // TODO: implement this
- }
-
- @Override
public Optional<PlcReadRequest.Builder> readRequestBuilder() {
return Optional.of(new DefaultPlcReadRequest.Builder(this, new TestFieldHandler()));
}