You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/01 16:19:07 UTC
[incubator-streampipes] branch dev updated: [hotfix] Fix data lake
pipeline for generic adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 46a220d [hotfix] Fix data lake pipeline for generic adapters
46a220d is described below
commit 46a220db6bddfcb1e1c25dd7b070e270a5f2141b
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri Oct 1 18:17:33 2021 +0200
[hotfix] Fix data lake pipeline for generic adapters
---
.../master/management/AdapterMasterManagement.java | 4 +-
.../master/management/SourcesManagement.java | 4 +-
.../master/management/WorkerRestClient.java | 7 +-
.../support/builder/GenericAdapterBuilder.ts | 87 ++++++++++++----------
ui/cypress/support/builder/PipelineBuilder.ts | 1 +
ui/cypress/support/utils/StaticPropertyUtils.ts | 30 ++++----
ui/cypress/tests/adapter/KafkaAdapter.ts | 73 ++++++++++++++++++
ui/cypress/tests/adapter/machineDataSimulator.ts | 15 +++-
.../adapter-started-dialog.component.ts | 13 +++-
.../static-alternatives.component.html | 12 ++-
...c-runtime-resolvable-oneof-input.component.html | 8 +-
11 files changed, 182 insertions(+), 72 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index f2ca003..c1c40ad 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -102,13 +102,13 @@ public class AdapterMasterManagement {
// start when stream adapter
if (ad instanceof AdapterStreamDescription) {
- // TODO
WorkerRestClient.invokeStreamAdapter(endpointUrl, adapterId);
LOG.info("Start adapter");
}
LOG.info("Install source (source URL: {} in backend", ad.getElementId());
- SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+ SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getAdapterId());
+ storedDescription.setCorrespondingAdapterId(adapterId);
installDataSource(storedDescription, username);
return storedDescription.getElementId();
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 849ea95..b731aab 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -140,7 +140,7 @@ public class SourcesManagement {
AdapterDescription adapterDescription = null;
List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
for (AdapterDescription a : allAdapters) {
- if (a.getUri().endsWith(id)) {
+ if (a.getAdapterId().equals(id)) {
adapterDescription = a;
}
}
@@ -182,7 +182,7 @@ public class SourcesManagement {
ds.setName(adapterDescription.getName());
ds.setDescription(adapterDescription.getDescription());
- ds.setCorrespondingAdapterId(adapterDescription.getAdapterId());
+ ds.setCorrespondingAdapterId(adapterDescription.getAppId());
ds.setInternallyManaged(true);
ds.setUri(url);
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index cb37eb2..04d33d0 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -41,7 +41,6 @@ import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.UUID;
public class WorkerRestClient {
@@ -84,9 +83,9 @@ public class WorkerRestClient {
logger.info("Trying to start adapter on endpoint: " + url);
// this ensures that all adapters have a valid uri otherwise the json-ld serializer fails
- if (ad.getUri() == null) {
- ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
- }
+// if (ad.getUri() == null) {
+// ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
+// }
String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
diff --git a/ui/cypress/support/builder/GenericAdapterBuilder.ts b/ui/cypress/support/builder/GenericAdapterBuilder.ts
index d496758..279e92e 100644
--- a/ui/cypress/support/builder/GenericAdapterBuilder.ts
+++ b/ui/cypress/support/builder/GenericAdapterBuilder.ts
@@ -20,57 +20,62 @@ import { UserInput } from '../model/UserInput';
import { GenericAdapterInput } from '../model/GenericAdapterInput';
export class GenericAdapterBuilder {
- genericAdapterInput: GenericAdapterInput;
+ genericAdapterInput: GenericAdapterInput;
- constructor(type: string) {
- this.genericAdapterInput = new GenericAdapterInput();
- this.genericAdapterInput.adapterType = type;
- this.genericAdapterInput.protocolConfiguration = [];
- this.genericAdapterInput.formatConfiguration = [];
- }
+ constructor(type: string) {
+ this.genericAdapterInput = new GenericAdapterInput();
+ this.genericAdapterInput.adapterType = type;
+ this.genericAdapterInput.protocolConfiguration = [];
+ this.genericAdapterInput.formatConfiguration = [];
+ }
- public static create(name: string) {
- return new GenericAdapterBuilder(name);
- }
+ public static create(name: string) {
+ return new GenericAdapterBuilder(name);
+ }
- public setName(name: string) {
- this.genericAdapterInput.adapterName = name;
- return this;
- }
+ public setName(name: string) {
+ this.genericAdapterInput.adapterName = name;
+ return this;
+ }
- public setTimestampProperty(timestsmpProperty: string) {
- this.genericAdapterInput.timestampProperty = timestsmpProperty;
- return this;
- }
+ public setTimestampProperty(timestsmpProperty: string) {
+ this.genericAdapterInput.timestampProperty = timestsmpProperty;
+ return this;
+ }
- public addProtocolInput(type: string, selector: string, value: string) {
- const userInput = new UserInput();
- userInput.type = type;
- userInput.selector = selector;
- userInput.value = value;
+ public setStoreInDataLake() {
+ this.genericAdapterInput.storeInDataLake = true;
+ return this;
+ }
- this.genericAdapterInput.protocolConfiguration.push(userInput);
+ public addProtocolInput(type: string, selector: string, value: string) {
+ const userInput = new UserInput();
+ userInput.type = type;
+ userInput.selector = selector;
+ userInput.value = value;
- return this;
- }
+ this.genericAdapterInput.protocolConfiguration.push(userInput);
- public setFormat(format: string) {
- this.genericAdapterInput.format = format;
- return this;
- }
+ return this;
+ }
- public addFormatInput(type: string, selector: string, value: string) {
- const userInput = new UserInput();
- userInput.type = type;
- userInput.selector = selector;
- userInput.value = value;
+ public setFormat(format: string) {
+ this.genericAdapterInput.format = format;
+ return this;
+ }
- this.genericAdapterInput.formatConfiguration.push(userInput);
+ public addFormatInput(type: string, selector: string, value: string) {
+ const userInput = new UserInput();
+ userInput.type = type;
+ userInput.selector = selector;
+ userInput.value = value;
- return this;
- }
+ this.genericAdapterInput.formatConfiguration.push(userInput);
- build() {
- return this.genericAdapterInput;
- }
+ return this;
+ }
+
+ build() {
+ return this.genericAdapterInput;
+ }
}
diff --git a/ui/cypress/support/builder/PipelineBuilder.ts b/ui/cypress/support/builder/PipelineBuilder.ts
index 0564aa8..7b69f91 100644
--- a/ui/cypress/support/builder/PipelineBuilder.ts
+++ b/ui/cypress/support/builder/PipelineBuilder.ts
@@ -42,6 +42,7 @@ export class PipelineBuilder {
return this;
}
+
public addProcessingElement(processingElement: PipelineElementInput) {
this.pipeline.processingElement = processingElement;
diff --git a/ui/cypress/support/utils/StaticPropertyUtils.ts b/ui/cypress/support/utils/StaticPropertyUtils.ts
index 03906f9..5216e02 100644
--- a/ui/cypress/support/utils/StaticPropertyUtils.ts
+++ b/ui/cypress/support/utils/StaticPropertyUtils.ts
@@ -20,19 +20,21 @@ import { UserInput } from '../model/UserInput';
export class StaticPropertyUtils {
- public static input(configs: UserInput[]) {
+ public static input(configs: UserInput[]) {
- // Configure Properties
- configs.forEach(config => {
- if (config.type === 'checkbox') {
- cy.dataCy(config.selector).children().click();
- } else if (config.type === 'drop-down') {
- cy.dataCy(config.selector).click().get('mat-option').contains(config.value).click();
- } else if (config.type === 'radio') {
- cy.dataCy(config.selector + config.value ).click();
- } else {
- cy.dataCy(config.selector).type(config.value);
- }
- });
- }
+ // Configure Properties
+ configs.forEach(config => {
+ if (config.type === 'checkbox') {
+ cy.dataCy(config.selector).children().click();
+ } else if (config.type === 'drop-down') {
+ cy.dataCy(config.selector).click().get('mat-option').contains(config.value).click();
+ } else if (config.type === 'radio') {
+ cy.dataCy(config.selector + config.value).click();
+ } else if (config.type === 'click') {
+ cy.dataCy(config.selector).click({ force: true });
+ } else {
+ cy.dataCy(config.selector).type(config.value);
+ }
+ });
+ }
}
diff --git a/ui/cypress/tests/adapter/KafkaAdapter.ts b/ui/cypress/tests/adapter/KafkaAdapter.ts
new file mode 100644
index 0000000..089ff23
--- /dev/null
+++ b/ui/cypress/tests/adapter/KafkaAdapter.ts
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { AdapterUtils } from '../../support/utils/AdapterUtils';
+import { GenericAdapterBuilder } from '../../support/builder/GenericAdapterBuilder';
+import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';
+import { PipelineBuilder } from '../../support/builder/PipelineBuilder';
+import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
+import { PipelineUtils } from '../../support/utils/PipelineUtils';
+
+describe('Test Random Data Simulator Stream Adapter', () => {
+ before('Setup Test', () => {
+ cy.initStreamPipesTest();
+ });
+
+ it('Perform Test', () => {
+ const simulatorAdapterName = 'simulator';
+
+
+ const machineAdapter = SpecificAdapterBuilder
+ .create('Machine_Data_Simulator')
+ .setName(simulatorAdapterName)
+ .addInput('input', 'wait-time-ms', '1000')
+ .build();
+
+ AdapterUtils.testSpecificStreamAdapter(machineAdapter);
+
+ const topicname = 'cypresstopic';
+ const pipelineInput = PipelineBuilder.create('Pipeline Test')
+ .addSource(simulatorAdapterName)
+ .addSink(
+ PipelineElementBuilder.create('kafka_publisher')
+ .addInput('select', 'Unauthenticated', 'check')
+ .addInput('input', 'host', 'localhost')
+ .addInput('input', 'port', '{backspace}{backspace}{backspace}{backspace}9094')
+ .addInput('input', 'topic', topicname)
+ .build())
+ .build();
+
+ PipelineUtils.testPipeline(pipelineInput);
+
+ const adapterInput = GenericAdapterBuilder
+ .create('Apache_Kafka')
+ .setName('Kafka4')
+ .setTimestampProperty('timestamp')
+ .setStoreInDataLake()
+ .addProtocolInput('select', 'Unauthenticated', 'check')
+ .addProtocolInput('input', 'host', 'localhost')
+ .addProtocolInput('input', 'port', '9094')
+ .addProtocolInput('click', 'sp-reload', '')
+ .addProtocolInput('select', topicname, 'check')
+ .setFormat('json_object')
+ .build();
+
+ AdapterUtils.testGenericStreamAdapter(adapterInput);
+ });
+
+});
diff --git a/ui/cypress/tests/adapter/machineDataSimulator.ts b/ui/cypress/tests/adapter/machineDataSimulator.ts
index 8fbc1d7..41137f2 100644
--- a/ui/cypress/tests/adapter/machineDataSimulator.ts
+++ b/ui/cypress/tests/adapter/machineDataSimulator.ts
@@ -29,10 +29,23 @@ describe('Test Random Data Simulator Stream Adapter', () => {
.create('Machine_Data_Simulator')
.setName('Machine Data Simulator Test')
.addInput('input', 'wait-time-ms', '1000')
+ .setTimestampProperty('timestamp')
+ .setStoreInDataLake()
.build();
AdapterUtils.testSpecificStreamAdapter(adapterInput);
- AdapterUtils.deleteAdapter();
+
+ // const adapterInput1 = SpecificAdapterBuilder
+ // .create('Machine_Data_Simulator')
+ // .setName('Machine Data Simulator Test 2')
+ // .addInput('input', 'wait-time-ms', '1000')
+ // .addInput('radio', 'selected-simulator-option', 'pressure')
+ // .setTimestampProperty('timestamp')
+ // .setStoreInDataLake()
+ // .build();
+ //
+ // AdapterUtils.testSpecificStreamAdapter(adapterInput1);
+ // AdapterUtils.deleteAdapter();
});
});
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index ef5db7f..e262175 100644
--- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -31,6 +31,7 @@ import { DialogRef } from '../../../core-ui/dialog/base-dialog/dialog-ref';
import { PipelineTemplateService } from '../../../platform-services/apis/pipeline-template.service';
import { PipelineInvocationBuilder } from '../../../core-services/template/PipelineInvocationBuilder';
+
@Component({
selector: 'sp-dialog-adapter-started-dialog',
templateUrl: './adapter-started-dialog.component.html',
@@ -98,7 +99,9 @@ export class AdapterStartedDialog implements OnInit {
if (this.saveInDataLake) {
const pipelineId = 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate';
- this.pipelineTemplateService.getPipelineTemplateInvocation(this.adapter.adapterId, pipelineId)
+ console.log(x.notifications[0].title);
+ console.log(this.adapter);
+ this.pipelineTemplateService.getPipelineTemplateInvocation(x.notifications[0].title, pipelineId)
.subscribe(res => {
const pipelineName = 'Persist ' + this.adapter.name;
@@ -109,9 +112,10 @@ export class AdapterStartedDialog implements OnInit {
.replace(/\./g, '');
// Ensure that index name is no number
- if (!Number.isNaN(indexName)) {
+ if (this.isNumber(indexName)) {
indexName = 'sp' + indexName;
}
+
const pipelineInvocation = PipelineInvocationBuilder
.create(res)
.setName(pipelineName)
@@ -140,4 +144,9 @@ export class AdapterStartedDialog implements OnInit {
this.shepherdService.trigger('confirm_adapter_started_button');
}
+ private isNumber(value: string | number): boolean {
+ return ((value != null) &&
+ (value !== '') &&
+ !isNaN(Number(value.toString())));
+ }
}
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
index cf6e2c7..52769e8 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
@@ -19,10 +19,14 @@
<div id="formWrapper" fxFlex="100" fxLayout="column">
<div fxFlex fxLayout="column" class="content">
<div>
- <mat-radio-group *ngIf="staticProperty.alternatives.length > 1"
- class="radio-group" (change)="radioSelectionChange($event)" required>
- <mat-radio-button *ngFor="let alternative of staticProperty.alternatives" class="radio-button"
- [value]="alternative" [checked]="alternative.selected" [matTooltip]="alternative.description">
+ <mat-radio-group *ngIf="staticProperty.alternatives.length > 1"
+ class="radio-group" (change)="radioSelectionChange($event)" required>
+ <mat-radio-button class="radio-button"
+ *ngFor="let alternative of staticProperty.alternatives"
+ [value]="alternative"
+ [checked]="alternative.selected"
+ [matTooltip]="alternative.description"
+ [attr.data-cy]="alternative.label">
<label style="font-weight: normal;">
{{alternative.label}}
</label>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
index 55a763f..8feaaf3 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
@@ -19,8 +19,10 @@
<div id="formWrapper" fxFlex="100" fxLayout="column">
<div>
<button mat-button mat-raised-button color="accent" class="small-button"
+ data-cy="sp-reload"
(click)="loadOptionsFromRestApi()"
- style="margin-right:10px;margin-left:10px;" [disabled]="!showOptions">
+ [disabled]="!showOptions"
+ style="margin-right:10px;margin-left:10px;">
<span>Reload</span>
</button>
</div>
@@ -28,7 +30,9 @@
<div fxFlex fxLayout="row">
<div fxLayout="column" *ngIf="showOptions || staticProperty.options" style="margin-left: 10px">
<mat-radio-button *ngFor="let option of staticProperty.options"
- (click)="select(option.elementId)" [checked]="option.selected">
+ (click)="select(option.elementId)"
+ [attr.data-cy]="option.name"
+ [checked]="option.selected">
<label style="font-weight: normal">
{{option.name}}
</label>