You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 11:29:03 UTC

[incubator-streampipes] 01/02: [STREAMPIPES-577] Improve error handling of adapters

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit eeb30dae58ec6a48e27e627132ad7cf7711ad4f9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 11:50:15 2022 +0200

    [STREAMPIPES-577] Improve error handling of adapters
---
 .../master/management/GuessManagement.java         | 12 +++----
 .../worker/management/GuessManagement.java         |  2 +-
 .../container/worker/rest/GuessResource.java       | 10 +++---
 .../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java     |  2 +-
 .../iiot/protocol/stream/KafkaProtocol.java        | 41 ++++++++++++++--------
 .../event-schema/event-schema.component.ts         |  2 +-
 .../static-property.component.html                 |  2 ++
 ...c-runtime-resolvable-oneof-input.component.html | 24 +++----------
 ...tic-runtime-resolvable-oneof-input.component.ts | 15 ++++++++
 9 files changed, 62 insertions(+), 48 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index ce380864c..de66c5449 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -26,6 +26,7 @@ import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
 import org.apache.http.util.EntityUtils;
 import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterEventPreviewPipeline;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
@@ -34,7 +35,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
-import org.apache.streampipes.model.message.ErrorMessage;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,12 +69,10 @@ public class GuessManagement {
             String responseString = EntityUtils.toString(httpResponse.getEntity());
 
             if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                return mapper.readValue(responseString, GuessSchema.class);
-            }  else {
-                ErrorMessage errorMessage = mapper.readValue(responseString, ErrorMessage.class);
-
-                LOG.error(errorMessage.getElementName());
-                throw new WorkerAdapterException(errorMessage);
+              return mapper.readValue(responseString, GuessSchema.class);
+            } else {
+              var exception = mapper.readValue(responseString, SpConfigurationException.class);
+              throw new WorkerAdapterException(exception.getMessage(), exception.getCause());
             }
     }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 41f81def3..9bc96f9b2 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -61,7 +61,7 @@ public class GuessManagement {
             throw new ParseException(errorClass + e.getMessage());
         } catch (Exception e) {
             LOG.error("Unknown Error: " + e.toString());
-            throw new AdapterException(e.toString());
+            throw new AdapterException(e.getMessage(), e);
         }
 
         return guessSchema;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 39d87840c..c357a5493 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -18,11 +18,11 @@
 
 package org.apache.streampipes.connect.container.worker.rest;
 
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.container.worker.management.GuessManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
 import org.slf4j.Logger;
@@ -62,10 +62,10 @@ public class GuessResource extends AbstractSharedRestInterface {
           return ok(result);
       } catch (ParseException e) {
           logger.error("Error while parsing events: ", e);
-          return serverError(Notifications.error(e.getMessage()));
-      } catch (Exception e) {
-          logger.error("Error while guess schema for AdapterDescription: " + adapterDescription.getElementId(), e);
-          return serverError(Notifications.error(e.getMessage()));
+          return serverError(e);
+      } catch (AdapterException e) {
+          logger.error("Error while guessing schema for AdapterDescription: {}, {}", adapterDescription.getElementId(), e.getMessage());
+          return serverError(e);
       }
 
   }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 04ece8aaa..2ee301ab1 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -136,7 +136,7 @@ public class Plc4xS7Adapter extends PullAdapter {
         getConfigurations(adapterDescription);
 
         if (this.pollingInterval < 10) {
-            throw new AdapterException("Polling interval must be higher then 10. Current value: " + this.pollingInterval);
+            throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
         }
 
         GuessSchema guessSchema = new GuessSchema();
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index 502aa7ce7..cdc8cb492 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -20,9 +20,11 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -30,7 +32,7 @@ import org.apache.streampipes.connect.api.IAdapterPipeline;
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.SupportsRuntimeConfig;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.apache.streampipes.model.AdapterType;
@@ -38,6 +40,8 @@ import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.pe.shared.config.kafka.KafkaConfig;
 import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
@@ -45,7 +49,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +58,7 @@ import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerProvidedOptions {
+public class KafkaProtocol extends BrokerProtocol implements SupportsRuntimeConfig {
 
     Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
     KafkaConfig config;
@@ -154,7 +157,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         return resultEventsByte;
     }
 
-    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
+    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
         final Properties props = new Properties();
 
         kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -165,6 +168,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 "KafkaExampleConsumer" + System.currentTimeMillis());
 
+        props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
+
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -214,22 +219,30 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
     }
 
     @Override
-    public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
+    public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) throws SpConfigurationException {
+        RuntimeResolvableOneOfStaticProperty config = extractor
+          .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
         KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
         boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
 
-        Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+        try {
+            Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+            Set<String> topics = consumer.listTopics().keySet();
+            consumer.close();
+
+            if (hideInternalTopics) {
+                topics = topics
+                  .stream()
+                  .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
+                  .collect(Collectors.toSet());
+            }
 
-        Set<String> topics = consumer.listTopics().keySet();
-        consumer.close();
+            config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
 
-        if (hideInternalTopics) {
-            topics = topics
-                    .stream()
-                    .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
-                    .collect(Collectors.toSet());
+            return config;
+        } catch (KafkaException e) {
+            throw new SpConfigurationException(e.getMessage(), e);
         }
-        return topics.stream().map(Option::new).collect(Collectors.toList());
     }
 
 
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
index e7867d91f..76d9bdaf1 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
@@ -129,7 +129,7 @@ export class EventSchemaComponent implements OnChanges {
         this.isEditableChange.emit(true);
         this.isLoading = false;
 
-        if (guessSchema.eventPreview) {
+        if (guessSchema.eventPreview && guessSchema.eventPreview.length > 0) {
           this.updatePreview();
         }
       },
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index 308ce518d..c7d89676f 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -68,6 +68,7 @@
                                                      [staticProperties]="staticProperties"
                                                      [eventSchemas]="eventSchemas"
                                                      [pipelineElement]="pipelineElement"
+                                                     [parentForm]="parentForm"
                                                      [completedStaticProperty]="completedStaticProperty"
                                                      [adapterId]="adapterId">
             </app-static-runtime-resolvable-any-input>
@@ -77,6 +78,7 @@
                                                        [pipelineElement]="pipelineElement"
                                                        [eventSchemas]="eventSchemas"
                                                        [staticProperty]="staticProperty"
+                                                       [parentForm]="parentForm"
                                                        [staticProperties]="staticProperties"
                                                        [adapterId]="adapterId"></app-static-runtime-resolvable-oneof-input>
 
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
index 265866cf3..3f1ff1076 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
@@ -16,7 +16,7 @@
   ~
   -->
 
-<div id="formWrapper" fxFlex="100" fxLayout="column">
+<div [formGroup]="parentForm" id="formWrapper" fxFlex="100" fxLayout="column">
     <div>
         <button mat-button mat-raised-button color="accent" class="small-button"
                 data-cy="sp-reload"
@@ -26,7 +26,10 @@
             <span>Reload</span>
         </button>
     </div>
-    <div *ngIf="!staticProperty.horizontalRendering" fxLayout="column">
+    <div fxLayout="column" *ngIf="error" class="mt-10">
+        <sp-exception-message [message]="errorMessage"></sp-exception-message>
+    </div>
+    <div *ngIf="!loading" fxLayout="column">
         <div fxFlex fxLayout="row">
             <div fxLayout="column" *ngIf="showOptions || staticProperty.options" style="margin-left: 10px">
                 <mat-radio-button *ngFor="let option of staticProperty.options"
@@ -49,21 +52,4 @@
             </div>
         </div>
     </div>
-
-    <div *ngIf="staticProperty.horizontalRendering">
-        <mat-form-field style="width: 100%">
-            <mat-label>{{staticProperty.label}}</mat-label>
-            <span matPrefix *ngIf="loading"><mat-spinner style="top:5px" [diameter]="20"></mat-spinner></span>
-            <mat-select>
-                <mat-option *ngFor="let option of staticProperty.options" [value]="option.elementId"
-                            (click)="select(option.elementId)">
-                    <label style="font-weight: normal">
-                        {{option.name}}
-                    </label>
-                </mat-option>
-            </mat-select>
-            <mat-hint>{{staticProperty.description}}</mat-hint>
-        </mat-form-field>
-    </div>
-
 </div>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index cae01d7c1..f5b973c11 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -20,6 +20,7 @@ import { Component, OnChanges, OnInit } from '@angular/core';
 import { RuntimeResolvableOneOfStaticProperty, StaticPropertyUnion } from '@streampipes/platform-services';
 import { RuntimeResolvableService } from '../static-runtime-resolvable-input/runtime-resolvable.service';
 import { BaseRuntimeResolvableSelectionInput } from '../static-runtime-resolvable-input/base-runtime-resolvable-selection-input';
+import { FormControl } from '@angular/forms';
 
 @Component({
     selector: 'app-static-runtime-resolvable-oneof-input',
@@ -35,6 +36,8 @@ export class StaticRuntimeResolvableOneOfInputComponent
 
     ngOnInit() {
         super.onInit();
+        this.parentForm.addControl(this.staticProperty.internalName, new FormControl(this.staticProperty.options, []));
+        this.performValidation();
     }
 
     afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
@@ -49,6 +52,7 @@ export class StaticRuntimeResolvableOneOfInputComponent
             option.selected = false;
         }
         this.staticProperty.options.find(option => option.elementId === id).selected = true;
+        this.performValidation();
     }
 
     parse(staticProperty: StaticPropertyUnion): RuntimeResolvableOneOfStaticProperty {
@@ -56,5 +60,16 @@ export class StaticRuntimeResolvableOneOfInputComponent
     }
 
     afterErrorReceived() {
+        this.staticProperty.options = [];
+        this.performValidation();
+    }
+
+    performValidation() {
+        let error = {error: true};
+        if (this.staticProperty.options && this.staticProperty.options.find(o => o.selected) !== undefined) {
+            error = undefined;
+        }
+        console.log(error);
+        this.parentForm.controls[this.staticProperty.internalName].setErrors(error);
     }
 }