You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 11:29:04 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-577] Let PLC adapter support preview

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 768dc0b9d2ab9ceb1234cbdd1df849bf8d41b53a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 13:28:51 2022 +0200

    [STREAMPIPES-577] Let PLC adapter support preview
---
 .../worker/management/GuessManagement.java         |   1 -
 .../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java     | 148 +++++++++++++--------
 .../adapters/plc4x/s7/PlcReadResponseHandler.java  |   9 ++
 .../event-schema/event-schema.component.html       |   2 +-
 4 files changed, 99 insertions(+), 61 deletions(-)

diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 9bc96f9b2..362c63a42 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -60,7 +60,6 @@ public class GuessManagement {
 
             throw new ParseException(errorClass + e.getMessage());
         } catch (Exception e) {
-            LOG.error("Unknown Error: " + e.toString());
             throw new AdapterException(e.getMessage(), e);
         }
 
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 2ee301ab1..f3e778a32 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -27,11 +27,12 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.util.PollingSettings;
-import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
 import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
@@ -56,8 +57,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
-public class Plc4xS7Adapter extends PullAdapter {
+public class Plc4xS7Adapter extends PullAdapter implements PlcReadResponseHandler {
 
     /**
      * A unique id to identify the Plc4xS7Adapter
@@ -133,31 +136,44 @@ public class Plc4xS7Adapter extends PullAdapter {
     public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
 
         // Extract user input
-        getConfigurations(adapterDescription);
+        try {
+            getConfigurations(adapterDescription);
 
-        if (this.pollingInterval < 10) {
-            throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
-        }
+            if (this.pollingInterval < 10) {
+                throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
+            }
 
-        GuessSchema guessSchema = new GuessSchema();
+            GuessSchema guessSchema = new GuessSchema();
 
-        EventSchema eventSchema = new EventSchema();
-        List<EventProperty> allProperties = new ArrayList<>();
+            EventSchema eventSchema = new EventSchema();
+            List<EventProperty> allProperties = new ArrayList<>();
 
-        for (Map<String, String> node : this.nodes) {
-            Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
-
-            allProperties.add(
-                    PrimitivePropertyBuilder
-                            .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
-                            .label(node.get(PLC_NODE_RUNTIME_NAME))
-                            .description("")
-                            .build());
-        }
+            for (Map<String, String> node : this.nodes) {
+                Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+
+                allProperties.add(
+                  PrimitivePropertyBuilder
+                    .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
+                    .label(node.get(PLC_NODE_RUNTIME_NAME))
+                    .description("")
+                    .build());
+            }
 
-        eventSchema.setEventProperties(allProperties);
-        guessSchema.setEventSchema(eventSchema);
-        return guessSchema;
+            var event = readPlcDataSynchronized();
+            var preview = event
+              .entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e ->
+                new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
+
+            eventSchema.setEventProperties(allProperties);
+            guessSchema.setEventSchema(eventSchema);
+            guessSchema.setEventPreview(List.of(preview));
+
+            return guessSchema;
+        } catch (PlcConnectionException | ExecutionException | InterruptedException | TimeoutException e) {
+            throw new AdapterException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -171,7 +187,6 @@ public class Plc4xS7Adapter extends PullAdapter {
 
         this.driverManager = new PooledPlcDriverManager();
         try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
-
             if (!plcConnection.getMetadata().canRead()) {
                 this.LOG.error("The S7 on IP: " + this.ip + " does not support reading data");
             }
@@ -188,49 +203,40 @@ public class Plc4xS7Adapter extends PullAdapter {
      */
     @Override
     protected void pullData() {
-
         // Create PLC read request
-        try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
-            PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-            for (Map<String, String> node : this.nodes) {
-                builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
-            }
-            PlcReadRequest readRequest = builder.build();
-
-            // Execute the request
-            CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
-
-            asyncResponse.whenComplete((response, throwable) -> {
-                // Create an event containing the value of the PLC
-                if (throwable != null) {
-                    throwable.printStackTrace();
-                    this.LOG.error(throwable.getMessage());
-                } else {
-                    Map<String, Object> event = new HashMap<>();
-                    for (Map<String, String> node : this.nodes) {
-                        if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
-                            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
-                        } else {
-                            this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
-                                    response.getResponseCode(node.get(PLC_NODE_NAME)).name());
-                        }
-                    }
-
-                    // publish the final event
-                    adapterPipeline.process(event);
-                }
-            });
-
-        } catch (InterruptedException | ExecutionException e) {
-            this.LOG.error(e.getMessage());
-            e.printStackTrace();
-        } catch (Exception e) {
+        try {
+            readPlcData(this);
+        } catch (PlcConnectionException e) {
             this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
             e.printStackTrace();
         }
 
     }
 
+    private PlcReadRequest makeReadRequest() throws PlcConnectionException {
+        PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip);
+        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+        for (Map<String, String> node : this.nodes) {
+            builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+        }
+        return builder.build();
+    }
+
+    private void readPlcData(PlcReadResponseHandler handler) throws PlcConnectionException {
+        var readRequest = makeReadRequest();
+        // Execute the request
+        CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+        asyncResponse.whenComplete(handler::onReadResult);
+    }
+
+    private Map<String, Object> readPlcDataSynchronized() throws PlcConnectionException, ExecutionException, InterruptedException, TimeoutException, AdapterException {
+        this.before();
+        var readRequest = makeReadRequest();
+        // Execute the request
+        var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
+        return makeEvent(readResponse);
+    }
+
     /**
      * Define the polling interval of this adapter. Default is to poll every second
      * @return
@@ -315,4 +321,28 @@ public class Plc4xS7Adapter extends PullAdapter {
         }
     }
 
+    @Override
+    public void onReadResult(PlcReadResponse response, Throwable throwable) {
+        if (throwable != null) {
+            throwable.printStackTrace();
+            this.LOG.error(throwable.getMessage());
+        } else {
+            var event = makeEvent(response);
+            // publish the final event
+            adapterPipeline.process(event);
+        }
+    }
+
+    private Map<String, Object> makeEvent(PlcReadResponse response) {
+        Map<String, Object> event = new HashMap<>();
+        for (Map<String, String> node : this.nodes) {
+            if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
+                event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+            } else {
+                this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
+                  response.getResponseCode(node.get(PLC_NODE_NAME)).name());
+            }
+        }
+        return event;
+    }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
new file mode 100644
index 000000000..071f27c5c
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
@@ -0,0 +1,9 @@
+package org.apache.streampipes.connect.iiot.adapters.plc4x.s7;
+
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public interface PlcReadResponseHandler {
+
+  void onReadResult(PlcReadResponse response,
+                    Throwable throwable);
+}
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
index 6709dad75..a478d6fbb 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
@@ -32,7 +32,7 @@
             </div>
         </div>
 
-        <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && schemaErrorHints.length > 0">
+        <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && !isError && schemaErrorHints.length > 0">
             <div fxFlex="100"
                  fxLayout="column"
                  [ngClass]="schemaErrorHint.level === 'error' ? 'schema-validation schema-validation-error' : 'schema-validation schema-validation-warning'"