You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/01 16:19:07 UTC

[incubator-streampipes] branch dev updated: [hotfix] Fix data lake pipeline for generic adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46a220d  [hotfix] Fix data lake pipeline for generic adapters
46a220d is described below

commit 46a220db6bddfcb1e1c25dd7b070e270a5f2141b
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri Oct 1 18:17:33 2021 +0200

    [hotfix] Fix data lake pipeline for generic adapters
---
 .../master/management/AdapterMasterManagement.java |  4 +-
 .../master/management/SourcesManagement.java       |  4 +-
 .../master/management/WorkerRestClient.java        |  7 +-
 .../support/builder/GenericAdapterBuilder.ts       | 87 ++++++++++++----------
 ui/cypress/support/builder/PipelineBuilder.ts      |  1 +
 ui/cypress/support/utils/StaticPropertyUtils.ts    | 30 ++++----
 ui/cypress/tests/adapter/KafkaAdapter.ts           | 73 ++++++++++++++++++
 ui/cypress/tests/adapter/machineDataSimulator.ts   | 15 +++-
 .../adapter-started-dialog.component.ts            | 13 +++-
 .../static-alternatives.component.html             | 12 ++-
 ...c-runtime-resolvable-oneof-input.component.html |  8 +-
 11 files changed, 182 insertions(+), 72 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index f2ca003..c1c40ad 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -102,13 +102,13 @@ public class AdapterMasterManagement {
 
     // start when stream adapter
     if (ad instanceof AdapterStreamDescription) {
-      // TODO
       WorkerRestClient.invokeStreamAdapter(endpointUrl, adapterId);
       LOG.info("Start adapter");
     }
 
     LOG.info("Install source (source URL: {} in backend", ad.getElementId());
-    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getAdapterId());
+    storedDescription.setCorrespondingAdapterId(adapterId);
     installDataSource(storedDescription, username);
 
     return storedDescription.getElementId();
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 849ea95..b731aab 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -140,7 +140,7 @@ public class SourcesManagement {
         AdapterDescription adapterDescription = null;
         List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
         for (AdapterDescription a : allAdapters) {
-            if (a.getUri().endsWith(id)) {
+            if (a.getAdapterId().equals(id)) {
                 adapterDescription = a;
             }
         }
@@ -182,7 +182,7 @@ public class SourcesManagement {
 
         ds.setName(adapterDescription.getName());
         ds.setDescription(adapterDescription.getDescription());
-        ds.setCorrespondingAdapterId(adapterDescription.getAdapterId());
+        ds.setCorrespondingAdapterId(adapterDescription.getAppId());
         ds.setInternallyManaged(true);
 
         ds.setUri(url);
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index cb37eb2..04d33d0 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -41,7 +41,6 @@ import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.UUID;
 
 public class WorkerRestClient {
 
@@ -84,9 +83,9 @@ public class WorkerRestClient {
             logger.info("Trying to start adapter on endpoint: " + url);
 
             // this ensures that all adapters have a valid uri otherwise the json-ld serializer fails
-            if (ad.getUri() == null) {
-                ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
-            }
+//            if (ad.getUri() == null) {
+//                ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
+//            }
 
             String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 
diff --git a/ui/cypress/support/builder/GenericAdapterBuilder.ts b/ui/cypress/support/builder/GenericAdapterBuilder.ts
index d496758..279e92e 100644
--- a/ui/cypress/support/builder/GenericAdapterBuilder.ts
+++ b/ui/cypress/support/builder/GenericAdapterBuilder.ts
@@ -20,57 +20,62 @@ import { UserInput } from '../model/UserInput';
 import { GenericAdapterInput } from '../model/GenericAdapterInput';
 
 export class GenericAdapterBuilder {
-    genericAdapterInput: GenericAdapterInput;
+  genericAdapterInput: GenericAdapterInput;
 
-    constructor(type: string) {
-        this.genericAdapterInput = new GenericAdapterInput();
-        this.genericAdapterInput.adapterType = type;
-        this.genericAdapterInput.protocolConfiguration = [];
-        this.genericAdapterInput.formatConfiguration = [];
-    }
+  constructor(type: string) {
+    this.genericAdapterInput = new GenericAdapterInput();
+    this.genericAdapterInput.adapterType = type;
+    this.genericAdapterInput.protocolConfiguration = [];
+    this.genericAdapterInput.formatConfiguration = [];
+  }
 
-    public static create(name: string) {
-        return new GenericAdapterBuilder(name);
-    }
+  public static create(name: string) {
+    return new GenericAdapterBuilder(name);
+  }
 
-    public setName(name: string) {
-        this.genericAdapterInput.adapterName = name;
-        return this;
-    }
+  public setName(name: string) {
+    this.genericAdapterInput.adapterName = name;
+    return this;
+  }
 
-    public setTimestampProperty(timestsmpProperty: string) {
-        this.genericAdapterInput.timestampProperty = timestsmpProperty;
-        return this;
-    }
+  public setTimestampProperty(timestsmpProperty: string) {
+    this.genericAdapterInput.timestampProperty = timestsmpProperty;
+    return this;
+  }
 
-    public addProtocolInput(type: string, selector: string, value: string) {
-        const  userInput = new UserInput();
-        userInput.type = type;
-        userInput.selector = selector;
-        userInput.value = value;
+  public setStoreInDataLake() {
+    this.genericAdapterInput.storeInDataLake = true;
+    return this;
+  }
 
-        this.genericAdapterInput.protocolConfiguration.push(userInput);
+  public addProtocolInput(type: string, selector: string, value: string) {
+    const userInput = new UserInput();
+    userInput.type = type;
+    userInput.selector = selector;
+    userInput.value = value;
 
-        return this;
-    }
+    this.genericAdapterInput.protocolConfiguration.push(userInput);
 
-    public setFormat(format: string) {
-       this.genericAdapterInput.format = format;
-       return this;
-    }
+    return this;
+  }
 
-    public addFormatInput(type: string, selector: string, value: string) {
-        const  userInput = new UserInput();
-        userInput.type = type;
-        userInput.selector = selector;
-        userInput.value = value;
+  public setFormat(format: string) {
+    this.genericAdapterInput.format = format;
+    return this;
+  }
 
-        this.genericAdapterInput.formatConfiguration.push(userInput);
+  public addFormatInput(type: string, selector: string, value: string) {
+    const userInput = new UserInput();
+    userInput.type = type;
+    userInput.selector = selector;
+    userInput.value = value;
 
-        return this;
-    }
+    this.genericAdapterInput.formatConfiguration.push(userInput);
 
-    build() {
-        return this.genericAdapterInput;
-    }
+    return this;
+  }
+
+  build() {
+    return this.genericAdapterInput;
+  }
 }
diff --git a/ui/cypress/support/builder/PipelineBuilder.ts b/ui/cypress/support/builder/PipelineBuilder.ts
index 0564aa8..7b69f91 100644
--- a/ui/cypress/support/builder/PipelineBuilder.ts
+++ b/ui/cypress/support/builder/PipelineBuilder.ts
@@ -42,6 +42,7 @@ export class PipelineBuilder {
 
         return this;
     }
+
     public addProcessingElement(processingElement: PipelineElementInput) {
         this.pipeline.processingElement = processingElement;
 
diff --git a/ui/cypress/support/utils/StaticPropertyUtils.ts b/ui/cypress/support/utils/StaticPropertyUtils.ts
index 03906f9..5216e02 100644
--- a/ui/cypress/support/utils/StaticPropertyUtils.ts
+++ b/ui/cypress/support/utils/StaticPropertyUtils.ts
@@ -20,19 +20,21 @@ import { UserInput } from '../model/UserInput';
 
 export class StaticPropertyUtils {
 
-    public static input(configs: UserInput[]) {
+  public static input(configs: UserInput[]) {
 
-        // Configure Properties
-        configs.forEach(config => {
-            if (config.type === 'checkbox') {
-                cy.dataCy(config.selector).children().click();
-            } else if (config.type === 'drop-down') {
-                cy.dataCy(config.selector).click().get('mat-option').contains(config.value).click();
-            }  else if (config.type === 'radio') {
-                cy.dataCy(config.selector + config.value ).click();
-            } else {
-                cy.dataCy(config.selector).type(config.value);
-            }
-        });
-    }
+    // Configure Properties
+    configs.forEach(config => {
+      if (config.type === 'checkbox') {
+        cy.dataCy(config.selector).children().click();
+      } else if (config.type === 'drop-down') {
+        cy.dataCy(config.selector).click().get('mat-option').contains(config.value).click();
+      } else if (config.type === 'radio') {
+        cy.dataCy(config.selector + config.value).click();
+      } else if (config.type === 'click') {
+        cy.dataCy(config.selector).click({ force: true });
+      } else {
+        cy.dataCy(config.selector).type(config.value);
+      }
+    });
+  }
 }
diff --git a/ui/cypress/tests/adapter/KafkaAdapter.ts b/ui/cypress/tests/adapter/KafkaAdapter.ts
new file mode 100644
index 0000000..089ff23
--- /dev/null
+++ b/ui/cypress/tests/adapter/KafkaAdapter.ts
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { AdapterUtils } from '../../support/utils/AdapterUtils';
+import { GenericAdapterBuilder } from '../../support/builder/GenericAdapterBuilder';
+import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';
+import { PipelineBuilder } from '../../support/builder/PipelineBuilder';
+import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
+import { PipelineUtils } from '../../support/utils/PipelineUtils';
+
+describe('Test Random Data Simulator Stream Adapter', () => {
+  before('Setup Test', () => {
+    cy.initStreamPipesTest();
+  });
+
+  it('Perform Test', () => {
+    const simulatorAdapterName = 'simulator';
+
+
+    const machineAdapter = SpecificAdapterBuilder
+      .create('Machine_Data_Simulator')
+      .setName(simulatorAdapterName)
+      .addInput('input', 'wait-time-ms', '1000')
+      .build();
+
+    AdapterUtils.testSpecificStreamAdapter(machineAdapter);
+
+    const topicname = 'cypresstopic';
+    const pipelineInput = PipelineBuilder.create('Pipeline Test')
+      .addSource(simulatorAdapterName)
+      .addSink(
+        PipelineElementBuilder.create('kafka_publisher')
+          .addInput('select', 'Unauthenticated', 'check')
+          .addInput('input', 'host', 'localhost')
+          .addInput('input', 'port', '{backspace}{backspace}{backspace}{backspace}9094')
+          .addInput('input', 'topic', topicname)
+          .build())
+      .build();
+
+    PipelineUtils.testPipeline(pipelineInput);
+
+    const adapterInput = GenericAdapterBuilder
+      .create('Apache_Kafka')
+      .setName('Kafka4')
+      .setTimestampProperty('timestamp')
+      .setStoreInDataLake()
+      .addProtocolInput('select', 'Unauthenticated', 'check')
+      .addProtocolInput('input', 'host', 'localhost')
+      .addProtocolInput('input', 'port', '9094')
+      .addProtocolInput('click', 'sp-reload', '')
+      .addProtocolInput('select', topicname, 'check')
+      .setFormat('json_object')
+      .build();
+
+    AdapterUtils.testGenericStreamAdapter(adapterInput);
+  });
+
+});
diff --git a/ui/cypress/tests/adapter/machineDataSimulator.ts b/ui/cypress/tests/adapter/machineDataSimulator.ts
index 8fbc1d7..41137f2 100644
--- a/ui/cypress/tests/adapter/machineDataSimulator.ts
+++ b/ui/cypress/tests/adapter/machineDataSimulator.ts
@@ -29,10 +29,23 @@ describe('Test Random Data Simulator Stream Adapter', () => {
       .create('Machine_Data_Simulator')
       .setName('Machine Data Simulator Test')
       .addInput('input', 'wait-time-ms', '1000')
+      .setTimestampProperty('timestamp')
+      .setStoreInDataLake()
       .build();
 
     AdapterUtils.testSpecificStreamAdapter(adapterInput);
-    AdapterUtils.deleteAdapter();
+
+    // const adapterInput1 = SpecificAdapterBuilder
+    //   .create('Machine_Data_Simulator')
+    //   .setName('Machine Data Simulator Test 2')
+    //   .addInput('input', 'wait-time-ms', '1000')
+    //   .addInput('radio', 'selected-simulator-option', 'pressure')
+    //   .setTimestampProperty('timestamp')
+    //   .setStoreInDataLake()
+    //   .build();
+    //
+    // AdapterUtils.testSpecificStreamAdapter(adapterInput1);
+    // AdapterUtils.deleteAdapter();
   });
 
 });
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index ef5db7f..e262175 100644
--- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -31,6 +31,7 @@ import { DialogRef } from '../../../core-ui/dialog/base-dialog/dialog-ref';
 import { PipelineTemplateService } from '../../../platform-services/apis/pipeline-template.service';
 import { PipelineInvocationBuilder } from '../../../core-services/template/PipelineInvocationBuilder';
 
+
 @Component({
   selector: 'sp-dialog-adapter-started-dialog',
   templateUrl: './adapter-started-dialog.component.html',
@@ -98,7 +99,9 @@ export class AdapterStartedDialog implements OnInit {
 
           if (this.saveInDataLake) {
             const pipelineId = 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate';
-            this.pipelineTemplateService.getPipelineTemplateInvocation(this.adapter.adapterId, pipelineId)
+            console.log(x.notifications[0].title);
+            console.log(this.adapter);
+            this.pipelineTemplateService.getPipelineTemplateInvocation(x.notifications[0].title, pipelineId)
               .subscribe(res => {
 
                 const pipelineName = 'Persist ' + this.adapter.name;
@@ -109,9 +112,10 @@ export class AdapterStartedDialog implements OnInit {
                   .replace(/\./g, '');
 
                 // Ensure that index name is no number
-                if (!Number.isNaN(indexName)) {
+                if (this.isNumber(indexName)) {
                   indexName = 'sp' + indexName;
                 }
+
                 const pipelineInvocation = PipelineInvocationBuilder
                   .create(res)
                   .setName(pipelineName)
@@ -140,4 +144,9 @@ export class AdapterStartedDialog implements OnInit {
     this.shepherdService.trigger('confirm_adapter_started_button');
   }
 
+  private isNumber(value: string | number): boolean {
+    return ((value != null) &&
+      (value !== '') &&
+      !isNaN(Number(value.toString())));
+  }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
index cf6e2c7..52769e8 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
@@ -19,10 +19,14 @@
 <div id="formWrapper" fxFlex="100" fxLayout="column">
     <div fxFlex fxLayout="column" class="content">
         <div>
-            <mat-radio-group  *ngIf="staticProperty.alternatives.length > 1"
-                    class="radio-group" (change)="radioSelectionChange($event)" required>
-                <mat-radio-button *ngFor="let alternative of staticProperty.alternatives" class="radio-button"
-                                  [value]="alternative" [checked]="alternative.selected" [matTooltip]="alternative.description">
+            <mat-radio-group *ngIf="staticProperty.alternatives.length > 1"
+                             class="radio-group" (change)="radioSelectionChange($event)" required>
+                <mat-radio-button class="radio-button"
+                                  *ngFor="let alternative of staticProperty.alternatives"
+                                  [value]="alternative"
+                                  [checked]="alternative.selected"
+                                  [matTooltip]="alternative.description"
+                                  [attr.data-cy]="alternative.label">
                     <label style="font-weight: normal;">
                         {{alternative.label}}
                     </label>
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
index 55a763f..8feaaf3 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.html
@@ -19,8 +19,10 @@
 <div id="formWrapper" fxFlex="100" fxLayout="column">
     <div>
         <button mat-button mat-raised-button color="accent" class="small-button"
+                data-cy="sp-reload"
                 (click)="loadOptionsFromRestApi()"
-                style="margin-right:10px;margin-left:10px;" [disabled]="!showOptions">
+                [disabled]="!showOptions"
+                style="margin-right:10px;margin-left:10px;">
             <span>Reload</span>
         </button>
     </div>
@@ -28,7 +30,9 @@
         <div fxFlex fxLayout="row">
             <div fxLayout="column" *ngIf="showOptions || staticProperty.options" style="margin-left: 10px">
                 <mat-radio-button *ngFor="let option of staticProperty.options"
-                                  (click)="select(option.elementId)" [checked]="option.selected">
+                                  (click)="select(option.elementId)"
+                                  [attr.data-cy]="option.name"
+                                  [checked]="option.selected">
                     <label style="font-weight: normal">
                         {{option.name}}
                     </label>