You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/11/19 10:40:59 UTC
[incubator-streampipes-extensions] branch dev updated: Add async
call for plc4x adapter
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f3b630 Add async call for plc4x adapter
4f3b630 is described below
commit 4f3b6308eec7bed36ebe77a85e49824f8edef0c4
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Nov 19 11:40:19 2020 +0100
Add async call for plc4x adapter
---
.../connect/adapters/plc4x/s7/Plc4xS7Adapter.java | 43 ++++++++++++----------
1 file changed, 24 insertions(+), 19 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
index 13672bd..91de8db 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -221,32 +222,36 @@ public class Plc4xS7Adapter extends PullAdapter {
PlcReadRequest readRequest = builder.build();
// Execute the request
- PlcReadResponse response = null;
- response = readRequest.execute().get();
+ CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+ asyncResponse.whenComplete((response, throwable) -> {
// Create an event containing the value of the PLC
- Map<String, Object> event = new HashMap<>();
- for (Map<String, String> node : this.nodes) {
- if(response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
- event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+ if (throwable != null) {
+ throwable.printStackTrace();
+ this.LOG.error(throwable.getMessage());
+ } else {
+ Map<String, Object> event = new HashMap<>();
+ for (Map<String, String> node : this.nodes) {
+ if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
+ event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+ } else {
+ logger.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
+ response.getResponseCode(node.get(PLC_NODE_NAME)).name());
+ }
}
- else {
- logger.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
- response.getResponseCode(node.get(PLC_NODE_NAME)).name());
- }
+ // publish the final event
+ adapterPipeline.process(event);
}
+ });
- // publish the final event
- adapterPipeline.process(event);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e.getMessage());
- e.printStackTrace();
- } catch (Exception e) {
- System.out.println("Could not establish connection to S7 with ip " + this.ip);
- this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
- e.printStackTrace();
- }
+ this.LOG.error(e.getMessage());
+ e.printStackTrace();
+ } catch (Exception e) {
+ this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
+ e.printStackTrace();
+ }
}