You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/11/19 10:40:59 UTC

[incubator-streampipes-extensions] branch dev updated: Add async call for plc4x adapter

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f3b630  Add async call for plc4x adapter
4f3b630 is described below

commit 4f3b6308eec7bed36ebe77a85e49824f8edef0c4
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Nov 19 11:40:19 2020 +0100

    Add async call for plc4x adapter
---
 .../connect/adapters/plc4x/s7/Plc4xS7Adapter.java  | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
index 13672bd..91de8db 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -221,32 +222,36 @@ public class Plc4xS7Adapter extends PullAdapter {
             PlcReadRequest readRequest = builder.build();
 
             // Execute the request
-            PlcReadResponse response = null;
-                response = readRequest.execute().get();
+            CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
 
+            asyncResponse.whenComplete((response, throwable) -> {
                 // Create an event containing the value of the PLC
-                Map<String, Object> event = new HashMap<>();
-                for (Map<String, String> node : this.nodes) {
-                    if(response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
-                        event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+                if (throwable != null) {
+                    throwable.printStackTrace();
+                    this.LOG.error(throwable.getMessage());
+                } else {
+                    Map<String, Object> event = new HashMap<>();
+                    for (Map<String, String> node : this.nodes) {
+                        if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
+                            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+                        } else {
+                            logger.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
+                                    response.getResponseCode(node.get(PLC_NODE_NAME)).name());
+                        }
                     }
 
-                    else {
-                        logger.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
-                                response.getResponseCode(node.get(PLC_NODE_NAME)).name());
-                    }
+                    // publish the final event
+                    adapterPipeline.process(event);
                 }
+            });
 
-                // publish the final event
-                adapterPipeline.process(event);
         } catch (InterruptedException | ExecutionException e) {
-                LOG.error(e.getMessage());
-                e.printStackTrace();
-            } catch (Exception e) {
-                System.out.println("Could not establish connection to S7 with ip " + this.ip);
-                this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
-                e.printStackTrace();
-            }
+            this.LOG.error(e.getMessage());
+            e.printStackTrace();
+        } catch (Exception e) {
+            this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
+            e.printStackTrace();
+        }
 
     }