You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 11:29:04 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-577] Let PLC adapter support preview
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 768dc0b9d2ab9ceb1234cbdd1df849bf8d41b53a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 13:28:51 2022 +0200
[STREAMPIPES-577] Let PLC adapter support preview
---
.../worker/management/GuessManagement.java | 1 -
.../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java | 148 +++++++++++++--------
.../adapters/plc4x/s7/PlcReadResponseHandler.java | 9 ++
.../event-schema/event-schema.component.html | 2 +-
4 files changed, 99 insertions(+), 61 deletions(-)
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 9bc96f9b2..362c63a42 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -60,7 +60,6 @@ public class GuessManagement {
throw new ParseException(errorClass + e.getMessage());
} catch (Exception e) {
- LOG.error("Unknown Error: " + e.toString());
throw new AdapterException(e.getMessage(), e);
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 2ee301ab1..f3e778a32 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -27,11 +27,12 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.apache.streampipes.connect.adapter.Adapter;
import org.apache.streampipes.connect.adapter.util.PollingSettings;
-import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
@@ -56,8 +57,10 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
-public class Plc4xS7Adapter extends PullAdapter {
+public class Plc4xS7Adapter extends PullAdapter implements PlcReadResponseHandler {
/**
* A unique id to identify the Plc4xS7Adapter
@@ -133,31 +136,44 @@ public class Plc4xS7Adapter extends PullAdapter {
public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
// Extract user input
- getConfigurations(adapterDescription);
+ try {
+ getConfigurations(adapterDescription);
- if (this.pollingInterval < 10) {
- throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
- }
+ if (this.pollingInterval < 10) {
+ throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
+ }
- GuessSchema guessSchema = new GuessSchema();
+ GuessSchema guessSchema = new GuessSchema();
- EventSchema eventSchema = new EventSchema();
- List<EventProperty> allProperties = new ArrayList<>();
+ EventSchema eventSchema = new EventSchema();
+ List<EventProperty> allProperties = new ArrayList<>();
- for (Map<String, String> node : this.nodes) {
- Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
-
- allProperties.add(
- PrimitivePropertyBuilder
- .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
- .label(node.get(PLC_NODE_RUNTIME_NAME))
- .description("")
- .build());
- }
+ for (Map<String, String> node : this.nodes) {
+ Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+
+ allProperties.add(
+ PrimitivePropertyBuilder
+ .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
+ .label(node.get(PLC_NODE_RUNTIME_NAME))
+ .description("")
+ .build());
+ }
- eventSchema.setEventProperties(allProperties);
- guessSchema.setEventSchema(eventSchema);
- return guessSchema;
+ var event = readPlcDataSynchronized();
+ var preview = event
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
+ new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
+
+ eventSchema.setEventProperties(allProperties);
+ guessSchema.setEventSchema(eventSchema);
+ guessSchema.setEventPreview(List.of(preview));
+
+ return guessSchema;
+ } catch (PlcConnectionException | ExecutionException | InterruptedException | TimeoutException e) {
+ throw new AdapterException(e.getMessage(), e);
+ }
}
/**
@@ -171,7 +187,6 @@ public class Plc4xS7Adapter extends PullAdapter {
this.driverManager = new PooledPlcDriverManager();
try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
-
if (!plcConnection.getMetadata().canRead()) {
this.LOG.error("The S7 on IP: " + this.ip + " does not support reading data");
}
@@ -188,49 +203,40 @@ public class Plc4xS7Adapter extends PullAdapter {
*/
@Override
protected void pullData() {
-
// Create PLC read request
- try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- for (Map<String, String> node : this.nodes) {
- builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
- }
- PlcReadRequest readRequest = builder.build();
-
- // Execute the request
- CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
-
- asyncResponse.whenComplete((response, throwable) -> {
- // Create an event containing the value of the PLC
- if (throwable != null) {
- throwable.printStackTrace();
- this.LOG.error(throwable.getMessage());
- } else {
- Map<String, Object> event = new HashMap<>();
- for (Map<String, String> node : this.nodes) {
- if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
- event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
- } else {
- this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
- response.getResponseCode(node.get(PLC_NODE_NAME)).name());
- }
- }
-
- // publish the final event
- adapterPipeline.process(event);
- }
- });
-
- } catch (InterruptedException | ExecutionException e) {
- this.LOG.error(e.getMessage());
- e.printStackTrace();
- } catch (Exception e) {
+ try {
+ readPlcData(this);
+ } catch (PlcConnectionException e) {
this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
e.printStackTrace();
}
}
+ private PlcReadRequest makeReadRequest() throws PlcConnectionException {
+ PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip);
+ PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+ for (Map<String, String> node : this.nodes) {
+ builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+ }
+ return builder.build();
+ }
+
+ private void readPlcData(PlcReadResponseHandler handler) throws PlcConnectionException {
+ var readRequest = makeReadRequest();
+ // Execute the request
+ CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+ asyncResponse.whenComplete(handler::onReadResult);
+ }
+
+ private Map<String, Object> readPlcDataSynchronized() throws PlcConnectionException, ExecutionException, InterruptedException, TimeoutException, AdapterException {
+ this.before();
+ var readRequest = makeReadRequest();
+ // Execute the request
+ var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
+ return makeEvent(readResponse);
+ }
+
/**
* Define the polling interval of this adapter. Default is to poll every second
* @return
@@ -315,4 +321,28 @@ public class Plc4xS7Adapter extends PullAdapter {
}
}
+ @Override
+ public void onReadResult(PlcReadResponse response, Throwable throwable) {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ this.LOG.error(throwable.getMessage());
+ } else {
+ var event = makeEvent(response);
+ // publish the final event
+ adapterPipeline.process(event);
+ }
+ }
+
+ private Map<String, Object> makeEvent(PlcReadResponse response) {
+ Map<String, Object> event = new HashMap<>();
+ for (Map<String, String> node : this.nodes) {
+ if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
+ event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+ } else {
+ this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
+ response.getResponseCode(node.get(PLC_NODE_NAME)).name());
+ }
+ }
+ return event;
+ }
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
new file mode 100644
index 000000000..071f27c5c
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
@@ -0,0 +1,9 @@
+package org.apache.streampipes.connect.iiot.adapters.plc4x.s7;
+
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public interface PlcReadResponseHandler {
+
+ void onReadResult(PlcReadResponse response,
+ Throwable throwable);
+}
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
index 6709dad75..a478d6fbb 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
@@ -32,7 +32,7 @@
</div>
</div>
- <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && schemaErrorHints.length > 0">
+ <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && !isError && schemaErrorHints.length > 0">
<div fxFlex="100"
fxLayout="column"
[ngClass]="schemaErrorHint.level === 'error' ? 'schema-validation schema-validation-error' : 'schema-validation schema-validation-warning'"