You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/18 20:33:14 UTC
[incubator-streampipes] 08/15: [STREAMPIPES-577] Improve error handling of adapters
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit df098481207c92101e1deeb0dee6c834ec568836
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 11:50:15 2022 +0200
[STREAMPIPES-577] Improve error handling of adapters
---
.../master/management/GuessManagement.java | 12 +++----
.../worker/management/GuessManagement.java | 2 +-
.../container/worker/rest/GuessResource.java | 10 +++---
.../iiot/adapters/plc4x/s7/Plc4xS7Adapter.java | 2 +-
.../iiot/protocol/stream/KafkaProtocol.java | 41 ++++++++++++++--------
.../event-schema/event-schema.component.ts | 2 +-
.../static-property.component.html | 2 ++
...c-runtime-resolvable-oneof-input.component.html | 24 +++----------
...tic-runtime-resolvable-oneof-input.component.ts | 15 ++++++++
9 files changed, 62 insertions(+), 48 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index ce380864c..de66c5449 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -26,6 +26,7 @@ import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterEventPreviewPipeline;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
@@ -34,7 +35,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
-import org.apache.streampipes.model.message.ErrorMessage;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,12 +69,10 @@ public class GuessManagement {
String responseString = EntityUtils.toString(httpResponse.getEntity());
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- return mapper.readValue(responseString, GuessSchema.class);
- } else {
- ErrorMessage errorMessage = mapper.readValue(responseString, ErrorMessage.class);
-
- LOG.error(errorMessage.getElementName());
- throw new WorkerAdapterException(errorMessage);
+ return mapper.readValue(responseString, GuessSchema.class);
+ } else {
+ var exception = mapper.readValue(responseString, SpConfigurationException.class);
+ throw new WorkerAdapterException(exception.getMessage(), exception.getCause());
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index 41f81def3..9bc96f9b2 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -61,7 +61,7 @@ public class GuessManagement {
throw new ParseException(errorClass + e.getMessage());
} catch (Exception e) {
LOG.error("Unknown Error: " + e.toString());
- throw new AdapterException(e.toString());
+ throw new AdapterException(e.getMessage(), e);
}
return guessSchema;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 39d87840c..c357a5493 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -18,11 +18,11 @@
package org.apache.streampipes.connect.container.worker.rest;
+import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.connect.container.worker.management.GuessManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
import org.slf4j.Logger;
@@ -62,10 +62,10 @@ public class GuessResource extends AbstractSharedRestInterface {
return ok(result);
} catch (ParseException e) {
logger.error("Error while parsing events: ", e);
- return serverError(Notifications.error(e.getMessage()));
- } catch (Exception e) {
- logger.error("Error while guess schema for AdapterDescription: " + adapterDescription.getElementId(), e);
- return serverError(Notifications.error(e.getMessage()));
+ return serverError(e);
+ } catch (AdapterException e) {
+ logger.error("Error while guessing schema for AdapterDescription: {}, {}", adapterDescription.getElementId(), e.getMessage());
+ return serverError(e);
}
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
index 04ece8aaa..2ee301ab1 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -136,7 +136,7 @@ public class Plc4xS7Adapter extends PullAdapter {
getConfigurations(adapterDescription);
if (this.pollingInterval < 10) {
- throw new AdapterException("Polling interval must be higher then 10. Current value: " + this.pollingInterval);
+ throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval);
}
GuessSchema guessSchema = new GuessSchema();
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index 502aa7ce7..cdc8cb492 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -20,9 +20,11 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.connect.SendToPipeline;
import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -30,7 +32,7 @@ import org.apache.streampipes.connect.api.IAdapterPipeline;
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.SupportsRuntimeConfig;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.model.AdapterType;
@@ -38,6 +40,8 @@ import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.pe.shared.config.kafka.KafkaConfig;
import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
@@ -45,7 +49,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.AdapterSourceType;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +58,7 @@ import java.io.InputStream;
import java.util.*;
import java.util.stream.Collectors;
-public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerProvidedOptions {
+public class KafkaProtocol extends BrokerProtocol implements SupportsRuntimeConfig {
Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
KafkaConfig config;
@@ -154,7 +157,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
return resultEventsByte;
}
- private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
+ private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
final Properties props = new Properties();
kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -165,6 +168,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer" + System.currentTimeMillis());
+ props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -214,22 +219,30 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
}
@Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
+ public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) throws SpConfigurationException {
+ RuntimeResolvableOneOfStaticProperty config = extractor
+ .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
- Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+ try {
+ Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
+ Set<String> topics = consumer.listTopics().keySet();
+ consumer.close();
+
+ if (hideInternalTopics) {
+ topics = topics
+ .stream()
+ .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
+ .collect(Collectors.toSet());
+ }
- Set<String> topics = consumer.listTopics().keySet();
- consumer.close();
+ config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
- if (hideInternalTopics) {
- topics = topics
- .stream()
- .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
- .collect(Collectors.toSet());
+ return config;
+ } catch (KafkaException e) {
+ throw new SpConfigurationException(e.getMessage(), e);
}
- return topics.stream().map(Option::new).collect(Collectors.toList());
}
diff --git a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
index e7867d91f..76d9bdaf1 100644
--- a/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
+++ b/ui/src/app/connect/components/new-adapter/schema-editor/event-schema/event-schema.component.ts
@@ -129,7 +129,7 @@ export class EventSchemaComponent implements OnChanges {
this.isEditableChange.emit(true);
this.isLoading = false;
- if (guessSchema.eventPreview) {
+ if (guessSchema.eventPreview && guessSchema.eventPreview.length > 0) {
this.updatePreview();
}
},
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index 308ce518d..c7d89676f 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -68,6 +68,7 @@
[staticProperties]="staticProperties"
[eventSchemas]="eventSchemas"
[pipelineElement]="pipelineElement"
+ [parentForm]="parentForm"
[completedStaticProperty]="completedStaticProperty"
[adapterId]="adapterId">
</app-static-runtime-resolvable-any-input>
@@ -77,6 +78,7 @@
[pipelineElement]="pipelineElement"
[eventSchemas]="eventSchemas"
[staticProperty]="staticProperty"
+ [parentForm]="parentForm"
[staticProperties]="staticProperties"
[adapterId]="adapterId"></app-static-runtime-resolvable-oneof-input>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
index 265866cf3..3f1ff1076 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
@@ -16,7 +16,7 @@
~
-->
-<div id="formWrapper" fxFlex="100" fxLayout="column">
+<div [formGroup]="parentForm" id="formWrapper" fxFlex="100" fxLayout="column">
<div>
<button mat-button mat-raised-button color="accent" class="small-button"
data-cy="sp-reload"
@@ -26,7 +26,10 @@
<span>Reload</span>
</button>
</div>
- <div *ngIf="!staticProperty.horizontalRendering" fxLayout="column">
+ <div fxLayout="column" *ngIf="error" class="mt-10">
+ <sp-exception-message [message]="errorMessage"></sp-exception-message>
+ </div>
+ <div *ngIf="!loading" fxLayout="column">
<div fxFlex fxLayout="row">
<div fxLayout="column" *ngIf="showOptions || staticProperty.options" style="margin-left: 10px">
<mat-radio-button *ngFor="let option of staticProperty.options"
@@ -49,21 +52,4 @@
</div>
</div>
</div>
-
- <div *ngIf="staticProperty.horizontalRendering">
- <mat-form-field style="width: 100%">
- <mat-label>{{staticProperty.label}}</mat-label>
- <span matPrefix *ngIf="loading"><mat-spinner style="top:5px" [diameter]="20"></mat-spinner></span>
- <mat-select>
- <mat-option *ngFor="let option of staticProperty.options" [value]="option.elementId"
- (click)="select(option.elementId)">
- <label style="font-weight: normal">
- {{option.name}}
- </label>
- </mat-option>
- </mat-select>
- <mat-hint>{{staticProperty.description}}</mat-hint>
- </mat-form-field>
- </div>
-
</div>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index cae01d7c1..f5b973c11 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -20,6 +20,7 @@ import { Component, OnChanges, OnInit } from '@angular/core';
import { RuntimeResolvableOneOfStaticProperty, StaticPropertyUnion } from '@streampipes/platform-services';
import { RuntimeResolvableService } from '../static-runtime-resolvable-input/runtime-resolvable.service';
import { BaseRuntimeResolvableSelectionInput } from '../static-runtime-resolvable-input/base-runtime-resolvable-selection-input';
+import { FormControl } from '@angular/forms';
@Component({
selector: 'app-static-runtime-resolvable-oneof-input',
@@ -35,6 +36,8 @@ export class StaticRuntimeResolvableOneOfInputComponent
ngOnInit() {
super.onInit();
+ this.parentForm.addControl(this.staticProperty.internalName, new FormControl(this.staticProperty.options, []));
+ this.performValidation();
}
afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
@@ -49,6 +52,7 @@ export class StaticRuntimeResolvableOneOfInputComponent
option.selected = false;
}
this.staticProperty.options.find(option => option.elementId === id).selected = true;
+ this.performValidation();
}
parse(staticProperty: StaticPropertyUnion): RuntimeResolvableOneOfStaticProperty {
@@ -56,5 +60,16 @@ export class StaticRuntimeResolvableOneOfInputComponent
}
afterErrorReceived() {
+ this.staticProperty.options = [];
+ this.performValidation();
+ }
+
+ performValidation() {
+ let error = {error: true};
+ if (this.staticProperty.options && this.staticProperty.options.find(o => o.selected) !== undefined) {
+ error = undefined;
+ }
+ console.log(error);
+ this.parentForm.controls[this.staticProperty.internalName].setErrors(error);
}
}