You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 11:29:02 UTC

[incubator-streampipes] branch STREAMPIPES-577 updated (1dede1727 -> 768dc0b9d)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


    from 1dede1727 [STREAMPIPES-577] More adapters support preview, improve preview appearance
     new eeb30dae5 [STREAMPIPES-577] Improve error handling of adapters
     new 768dc0b9d [STREAMPIPES-577] Let PLC adapter support preview

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/management/GuessManagement.java         |  12 +-
 .../worker/management/GuessManagement.java         |   3 +-
 .../container/worker/rest/GuessResource.java       |  10 +-
 .../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java     | 148 +++++++++++++--------
 .../adapters/plc4x/s7/PlcReadResponseHandler.java  |   9 ++
 .../iiot/protocol/stream/KafkaProtocol.java        |  41 ++++--
 .../event-schema/event-schema.component.html       |   2 +-
 .../event-schema/event-schema.component.ts         |   2 +-
 .../static-property.component.html                 |   2 +
 ...c-runtime-resolvable-oneof-input.component.html |  24 +---
 ...tic-runtime-resolvable-oneof-input.component.ts |  15 +++
 11 files changed, 160 insertions(+), 108 deletions(-)
 create mode 100644 streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java


[incubator-streampipes] 02/02: [STREAMPIPES-577] Let PLC adapter support preview

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 768dc0b9d2ab9ceb1234cbdd1df849bf8d41b53a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 13:28:51 2022 +0200

    [STREAMPIPES-577] Let PLC adapter support preview
---
 .../worker/management/GuessManagement.java         |   1 -
 .../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java     | 148 +++++++++++++--------
 .../adapters/plc4x/s7/PlcReadResponseHandler.java  |   9 ++
 .../event-schema/event-schema.component.html       |   2 +-
 4 files changed, 99 insertions(+), 61 deletions(-)

diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 9bc96f9b2..362c63a42 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -60,7 +60,6 @@ public class GuessManagement {
 
             throw new ParseException(errorClass + e.getMessage());
         } catch (Exception e) {
-            LOG.error("Unknown Error: " + e.toString());
             throw new AdapterException(e.getMessage(), e);
         }
 
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 2ee301ab1..f3e778a32 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -27,11 +27,12 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.util.PollingSettings;
-import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
 import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
@@ -56,8 +57,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
-public class Plc4xS7Adapter extends PullAdapter {
+public class Plc4xS7Adapter extends PullAdapter implements PlcReadResponseHandler {
 
     /**
      * A unique id to identify the Plc4xS7Adapter
@@ -133,31 +136,44 @@ public class Plc4xS7Adapter extends PullAdapter {
     public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
 
         // Extract user input
-        getConfigurations(adapterDescription);
+        try {
+            getConfigurations(adapterDescription);
 
-        if (this.pollingInterval < 10) {
-            throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
-        }
+            if (this.pollingInterval < 10) {
+                throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
+            }
 
-        GuessSchema guessSchema = new GuessSchema();
+            GuessSchema guessSchema = new GuessSchema();
 
-        EventSchema eventSchema = new EventSchema();
-        List<EventProperty> allProperties = new ArrayList<>();
+            EventSchema eventSchema = new EventSchema();
+            List<EventProperty> allProperties = new ArrayList<>();
 
-        for (Map<String, String> node : this.nodes) {
-            Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
-
-            allProperties.add(
-                    PrimitivePropertyBuilder
-                            .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
-                            .label(node.get(PLC_NODE_RUNTIME_NAME))
-                            .description("")
-                            .build());
-        }
+            for (Map<String, String> node : this.nodes) {
+                Datatypes datatype = getStreamPipesDataType(node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+
+                allProperties.add(
+                  PrimitivePropertyBuilder
+                    .create(datatype, node.get(PLC_NODE_RUNTIME_NAME))
+                    .label(node.get(PLC_NODE_RUNTIME_NAME))
+                    .description("")
+                    .build());
+            }
 
-        eventSchema.setEventProperties(allProperties);
-        guessSchema.setEventSchema(eventSchema);
-        return guessSchema;
+            var event = readPlcDataSynchronized();
+            var preview = event
+              .entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e ->
+                new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
+
+            eventSchema.setEventProperties(allProperties);
+            guessSchema.setEventSchema(eventSchema);
+            guessSchema.setEventPreview(List.of(preview));
+
+            return guessSchema;
+        } catch (PlcConnectionException | ExecutionException | InterruptedException | TimeoutException e) {
+            throw new AdapterException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -171,7 +187,6 @@ public class Plc4xS7Adapter extends PullAdapter {
 
         this.driverManager = new PooledPlcDriverManager();
         try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
-
             if (!plcConnection.getMetadata().canRead()) {
                 this.LOG.error("The S7 on IP: " + this.ip + " does not support reading data");
             }
@@ -188,49 +203,40 @@ public class Plc4xS7Adapter extends PullAdapter {
      */
     @Override
     protected void pullData() {
-
         // Create PLC read request
-        try (PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip)) {
-            PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-            for (Map<String, String> node : this.nodes) {
-                builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
-            }
-            PlcReadRequest readRequest = builder.build();
-
-            // Execute the request
-            CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
-
-            asyncResponse.whenComplete((response, throwable) -> {
-                // Create an event containing the value of the PLC
-                if (throwable != null) {
-                    throwable.printStackTrace();
-                    this.LOG.error(throwable.getMessage());
-                } else {
-                    Map<String, Object> event = new HashMap<>();
-                    for (Map<String, String> node : this.nodes) {
-                        if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
-                            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
-                        } else {
-                            this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
-                                    response.getResponseCode(node.get(PLC_NODE_NAME)).name());
-                        }
-                    }
-
-                    // publish the final event
-                    adapterPipeline.process(event);
-                }
-            });
-
-        } catch (InterruptedException | ExecutionException e) {
-            this.LOG.error(e.getMessage());
-            e.printStackTrace();
-        } catch (Exception e) {
+        try {
+            readPlcData(this);
+        } catch (PlcConnectionException e) {
             this.LOG.error("Could not establish connection to S7 with ip " + this.ip, e);
             e.printStackTrace();
         }
 
     }
 
+    private PlcReadRequest makeReadRequest() throws PlcConnectionException {
+        PlcConnection plcConnection = this.driverManager.getConnection("s7://" + this.ip);
+        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+        for (Map<String, String> node : this.nodes) {
+            builder.addItem(node.get(PLC_NODE_NAME), node.get(PLC_NODE_NAME) + ":" + node.get(PLC_NODE_TYPE).toUpperCase().replaceAll(" ", "_"));
+        }
+        return builder.build();
+    }
+
+    private void readPlcData(PlcReadResponseHandler handler) throws PlcConnectionException {
+        var readRequest = makeReadRequest();
+        // Execute the request
+        CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+        asyncResponse.whenComplete(handler::onReadResult);
+    }
+
+    private Map<String, Object> readPlcDataSynchronized() throws PlcConnectionException, ExecutionException, InterruptedException, TimeoutException, AdapterException {
+        this.before();
+        var readRequest = makeReadRequest();
+        // Execute the request
+        var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
+        return makeEvent(readResponse);
+    }
+
     /**
      * Define the polling interval of this adapter. Default is to poll every second
      * @return
@@ -315,4 +321,28 @@ public class Plc4xS7Adapter extends PullAdapter {
         }
     }
 
+    @Override
+    public void onReadResult(PlcReadResponse response, Throwable throwable) {
+        if (throwable != null) {
+            throwable.printStackTrace();
+            this.LOG.error(throwable.getMessage());
+        } else {
+            var event = makeEvent(response);
+            // publish the final event
+            adapterPipeline.process(event);
+        }
+    }
+
+    private Map<String, Object> makeEvent(PlcReadResponse response) {
+        Map<String, Object> event = new HashMap<>();
+        for (Map<String, String> node : this.nodes) {
+            if (response.getResponseCode(node.get(PLC_NODE_NAME)) == PlcResponseCode.OK) {
+                event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getObject(node.get(PLC_NODE_NAME)));
+            } else {
+                this.LOG.error("Error[" + node.get(PLC_NODE_NAME) + "]: " +
+                  response.getResponseCode(node.get(PLC_NODE_NAME)).name());
+            }
+        }
+        return event;
+    }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
new file mode 100644
index 000000000..071f27c5c
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/PlcReadResponseHandler.java
@@ -0,0 +1,9 @@
+package org.apache.streampipes.connect.iiot.adapters.plc4x.s7;
+
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public interface PlcReadResponseHandler {
+
+  void onReadResult(PlcReadResponse response,
+                    Throwable throwable);
+}
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
index 6709dad75..a478d6fbb 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.html
@@ -32,7 +32,7 @@
             </div>
         </div>
 
-        <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && schemaErrorHints.length > 0">
+        <div fxLayout="column" fxFlex="100" *ngIf="!isLoading && !isError && schemaErrorHints.length > 0">
             <div fxFlex="100"
                  fxLayout="column"
                  [ngClass]="schemaErrorHint.level === 'error' ? 'schema-validation schema-validation-error' : 'schema-validation schema-validation-warning'"


[incubator-streampipes] 01/02: [STREAMPIPES-577] Improve error handling of adapters

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit eeb30dae58ec6a48e27e627132ad7cf7711ad4f9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 11:50:15 2022 +0200

    [STREAMPIPES-577] Improve error handling of adapters
---
 .../master/management/GuessManagement.java         | 12 +++----
 .../worker/management/GuessManagement.java         |  2 +-
 .../container/worker/rest/GuessResource.java       | 10 +++---
 .../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java     |  2 +-
 .../iiot/protocol/stream/KafkaProtocol.java        | 41 ++++++++++++++--------
 .../event-schema/event-schema.component.ts         |  2 +-
 .../static-property.component.html                 |  2 ++
 ...c-runtime-resolvable-oneof-input.component.html | 24 +++----------
 ...tic-runtime-resolvable-oneof-input.component.ts | 15 ++++++++
 9 files changed, 62 insertions(+), 48 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index ce380864c..de66c5449 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -26,6 +26,7 @@ import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
 import org.apache.http.util.EntityUtils;
 import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterEventPreviewPipeline;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
@@ -34,7 +35,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
-import org.apache.streampipes.model.message.ErrorMessage;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,12 +69,10 @@ public class GuessManagement {
             String responseString = EntityUtils.toString(httpResponse.getEntity());
 
             if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                return mapper.readValue(responseString, GuessSchema.class);
-            }  else {
-                ErrorMessage errorMessage = mapper.readValue(responseString, ErrorMessage.class);
-
-                LOG.error(errorMessage.getElementName());
-                throw new WorkerAdapterException(errorMessage);
+              return mapper.readValue(responseString, GuessSchema.class);
+            } else {
+              var exception = mapper.readValue(responseString, SpConfigurationException.class);
+              throw new WorkerAdapterException(exception.getMessage(), exception.getCause());
             }
     }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 41f81def3..9bc96f9b2 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -61,7 +61,7 @@ public class GuessManagement {
             throw new ParseException(errorClass + e.getMessage());
         } catch (Exception e) {
             LOG.error("Unknown Error: " + e.toString());
-            throw new AdapterException(e.toString());
+            throw new AdapterException(e.getMessage(), e);
         }
 
         return guessSchema;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 39d87840c..c357a5493 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -18,11 +18,11 @@
 
 package org.apache.streampipes.connect.container.worker.rest;
 
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.container.worker.management.GuessManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
 import org.slf4j.Logger;
@@ -62,10 +62,10 @@ public class GuessResource extends AbstractSharedRestInterface {
           return ok(result);
       } catch (ParseException e) {
           logger.error("Error while parsing events: ", e);
-          return serverError(Notifications.error(e.getMessage()));
-      } catch (Exception e) {
-          logger.error("Error while guess schema for AdapterDescription: " + adapterDescription.getElementId(), e);
-          return serverError(Notifications.error(e.getMessage()));
+          return serverError(e);
+      } catch (AdapterException e) {
+          logger.error("Error while guessing schema for AdapterDescription: {}, {}", adapterDescription.getElementId(), e.getMessage());
+          return serverError(e);
       }
 
   }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 04ece8aaa..2ee301ab1 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -136,7 +136,7 @@ public class Plc4xS7Adapter extends PullAdapter {
         getConfigurations(adapterDescription);
 
         if (this.pollingInterval < 10) {
-            throw new AdapterException("Polling interval must be higher then 10. Current value: " + this.pollingInterval);
+            throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
         }
 
         GuessSchema guessSchema = new GuessSchema();
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index 502aa7ce7..cdc8cb492 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -20,9 +20,11 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -30,7 +32,7 @@ import org.apache.streampipes.connect.api.IAdapterPipeline;
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.SupportsRuntimeConfig;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.apache.streampipes.model.AdapterType;
@@ -38,6 +40,8 @@ import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.pe.shared.config.kafka.KafkaConfig;
 import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
@@ -45,7 +49,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +58,7 @@ import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerProvidedOptions {
+public class KafkaProtocol extends BrokerProtocol implements SupportsRuntimeConfig {
 
     Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
     KafkaConfig config;
@@ -154,7 +157,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         return resultEventsByte;
     }
 
-    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
+    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
         final Properties props = new Properties();
 
         kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -165,6 +168,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 "KafkaExampleConsumer" + System.currentTimeMillis());
 
+        props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
+
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -214,22 +219,30 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
     }
 
     @Override
-    public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
+    public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) throws SpConfigurationException {
+        RuntimeResolvableOneOfStaticProperty config = extractor
+          .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
         KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
         boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
 
-        Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+        try {
+            Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+            Set<String> topics = consumer.listTopics().keySet();
+            consumer.close();
+
+            if (hideInternalTopics) {
+                topics = topics
+                  .stream()
+                  .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
+                  .collect(Collectors.toSet());
+            }
 
-        Set<String> topics = consumer.listTopics().keySet();
-        consumer.close();
+            config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
 
-        if (hideInternalTopics) {
-            topics = topics
-                    .stream()
-                    .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
-                    .collect(Collectors.toSet());
+            return config;
+        } catch (KafkaException e) {
+            throw new SpConfigurationException(e.getMessage(), e);
         }
-        return topics.stream().map(Option::new).collect(Collectors.toList());
     }
 
 
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
index e7867d91f..76d9bdaf1 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
@@ -129,7 +129,7 @@ export class EventSchemaComponent implements OnChanges {
         this.isEditableChange.emit(true);
         this.isLoading = false;
 
-        if (guessSchema.eventPreview) {
+        if (guessSchema.eventPreview && guessSchema.eventPreview.length > 0) {
           this.updatePreview();
         }
       },
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index 308ce518d..c7d89676f 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -68,6 +68,7 @@
                                                      [staticProperties]="staticProperties"
                                                      [eventSchemas]="eventSchemas"
                                                      [pipelineElement]="pipelineElement"
+                                                     [parentForm]="parentForm"
                                                      [completedStaticProperty]="completedStaticProperty"
                                                      [adapterId]="adapterId">
             </app-static-runtime-resolvable-any-input>
@@ -77,6 +78,7 @@
                                                        [pipelineElement]="pipelineElement"
                                                        [eventSchemas]="eventSchemas"
                                                        [staticProperty]="staticProperty"
+                                                       [parentForm]="parentForm"
                                                        [staticProperties]="staticProperties"
                                                        [adapterId]="adapterId"></app-static-runtime-resolvable-oneof-input>
 
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
index 265866cf3..3f1ff1076 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
@@ -16,7 +16,7 @@
   ~
   -->
 
-<div id="formWrapper" fxFlex="100" fxLayout="column">
+<div [formGroup]="parentForm" id="formWrapper" fxFlex="100" fxLayout="column">
     <div>
         <button mat-button mat-raised-button color="accent" class="small-button"
                 data-cy="sp-reload"
@@ -26,7 +26,10 @@
             <span>Reload</span>
         </button>
     </div>
-    <div *ngIf="!staticProperty.horizontalRendering" fxLayout="column">
+    <div fxLayout="column" *ngIf="error" class="mt-10">
+        <sp-exception-message [message]="errorMessage"></sp-exception-message>
+    </div>
+    <div *ngIf="!loading" fxLayout="column">
         <div fxFlex fxLayout="row">
             <div fxLayout="column" *ngIf="showOptions || staticProperty.options" style="margin-left: 10px">
                 <mat-radio-button *ngFor="let option of staticProperty.options"
@@ -49,21 +52,4 @@
             </div>
         </div>
     </div>
-
-    <div *ngIf="staticProperty.horizontalRendering">
-        <mat-form-field style="width: 100%">
-            <mat-label>{{staticProperty.label}}</mat-label>
-            <span matPrefix *ngIf="loading"><mat-spinner style="top:5px" [diameter]="20"></mat-spinner></span>
-            <mat-select>
-                <mat-option *ngFor="let option of staticProperty.options" [value]="option.elementId"
-                            (click)="select(option.elementId)">
-                    <label style="font-weight: normal">
-                        {{option.name}}
-                    </label>
-                </mat-option>
-            </mat-select>
-            <mat-hint>{{staticProperty.description}}</mat-hint>
-        </mat-form-field>
-    </div>
-
 </div>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index cae01d7c1..f5b973c11 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -20,6 +20,7 @@ import { Component, OnChanges, OnInit } from '@angular/core';
 import { RuntimeResolvableOneOfStaticProperty, StaticPropertyUnion } from '@streampipes/platform-services';
 import { RuntimeResolvableService } from '../static-runtime-resolvable-input/runtime-resolvable.service';
 import { BaseRuntimeResolvableSelectionInput } from '../static-runtime-resolvable-input/base-runtime-resolvable-selection-input';
+import { FormControl } from '@angular/forms';
 
 @Component({
     selector: 'app-static-runtime-resolvable-oneof-input',
@@ -35,6 +36,8 @@ export class StaticRuntimeResolvableOneOfInputComponent
 
     ngOnInit() {
         super.onInit();
+        this.parentForm.addControl(this.staticProperty.internalName, new FormControl(this.staticProperty.options, []));
+        this.performValidation();
     }
 
     afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
@@ -49,6 +52,7 @@ export class StaticRuntimeResolvableOneOfInputComponent
             option.selected = false;
         }
         this.staticProperty.options.find(option => option.elementId === id).selected = true;
+        this.performValidation();
     }
 
     parse(staticProperty: StaticPropertyUnion): RuntimeResolvableOneOfStaticProperty {
@@ -56,5 +60,16 @@ export class StaticRuntimeResolvableOneOfInputComponent
     }
 
     afterErrorReceived() {
+        this.staticProperty.options = [];
+        this.performValidation();
+    }
+
+    performValidation() {
+        let error = {error: true};
+        if (this.staticProperty.options && this.staticProperty.options.find(o => o.selected) !== undefined) {
+            error = undefined;
+        }
+        console.log(error);
+        this.parentForm.controls[this.staticProperty.internalName].setErrors(error);
     }
 }