You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/19 13:58:18 UTC

[incubator-streampipes] branch rel/0.70.0 updated: [STREAMPIPES-565] Support broker settings override when importing resources

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/rel/0.70.0 by this push:
     new 15808fee1 [STREAMPIPES-565] Support broker settings override when importing resources
15808fee1 is described below

commit 15808fee127c87611d11d677fc29e8a5c1593ea6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Aug 19 15:58:07 2022 +0200

    [STREAMPIPES-565] Support broker settings override when importing resources
---
 .../export/dataimport/PerformImportGenerator.java  |  6 ++--
 .../export/resolver/AdapterResolver.java           | 11 +++++++
 .../export/resolver/DataSourceResolver.java        | 10 ++++++
 .../export/resolver/PipelineResolver.java          | 31 ++++++++++++++++++
 .../export/utils/EventGroundingProcessor.java      | 37 ++++++++++++++++++++++
 .../model/export/AssetExportConfiguration.java     | 10 ++++++
 .../src/lib/model/gen/streampipes-model.ts         |  6 ++--
 .../data-export-dialog.component.html              |  8 ++++-
 .../export-dialog/data-export-dialog.component.ts  |  2 ++
 .../data-import-dialog.component.html              | 12 +++++++
 .../import-dialog/data-import-dialog.component.ts  |  2 ++
 11 files changed, 129 insertions(+), 6 deletions(-)

diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
index 0c66afc3e..f4345f946 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
@@ -61,7 +61,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
   @Override
   protected void handleAdapter(String document, String adapterId) throws JsonProcessingException {
     if (shouldStore(adapterId, config.getAdapters())) {
-      new AdapterResolver().writeDocument(document);
+      new AdapterResolver().writeDocument(document, config.isOverrideBrokerSettings());
       permissionsToStore.add(new PermissionInfo(adapterId, AdapterDescription.class));
     }
   }
@@ -85,7 +85,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
   @Override
   protected void handleDataSource(String document, String dataSourceId) throws JsonProcessingException {
     if (shouldStore(dataSourceId, config.getDataSources())) {
-      new DataSourceResolver().writeDocument(document);
+      new DataSourceResolver().writeDocument(document, config.isOverrideBrokerSettings());
       permissionsToStore.add(new PermissionInfo(dataSourceId, SpDataStream.class));
     }
   }
@@ -93,7 +93,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
   @Override
   protected void handlePipeline(String document, String pipelineId) throws JsonProcessingException {
     if (shouldStore(pipelineId, config.getPipelines())) {
-      new PipelineResolver().writeDocument(document);
+      new PipelineResolver().writeDocument(document, config.isOverrideBrokerSettings());
       permissionsToStore.add(new PermissionInfo(pipelineId, Pipeline.class));
     }
   }
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 2598d9dc3..c879d982c 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -20,6 +20,7 @@
 package org.apache.streampipes.export.resolver;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
@@ -31,6 +32,7 @@ public class AdapterResolver extends AbstractResolver<AdapterDescription> {
   public AdapterDescription findDocument(String resourceId) {
     var doc =  getNoSqlStore().getAdapterInstanceStorage().getAdapter(resourceId);
     doc.setRev(null);
+    doc.setSelectedEndpointUrl(null);
     if (doc instanceof AdapterStreamDescription) {
       ((AdapterStreamDescription) doc).setRunning(false);
     }
@@ -52,6 +54,15 @@ public class AdapterResolver extends AbstractResolver<AdapterDescription> {
     getNoSqlStore().getAdapterInstanceStorage().storeAdapter(deserializeDocument(document));
   }
 
+  public void writeDocument(String document,
+                            boolean overrideDocument) throws JsonProcessingException {
+    var adapterDescription = deserializeDocument(document);
+    if (overrideDocument) {
+      EventGroundingProcessor.applyOverride(adapterDescription.getEventGrounding().getTransportProtocol());
+    }
+    getNoSqlStore().getAdapterInstanceStorage().storeAdapter(adapterDescription);
+  }
+
   @Override
   protected AdapterDescription deserializeDocument(String document) throws JsonProcessingException {
     return this.spMapper.readValue(document, AdapterDescription.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
index 6a79f3d7c..a38bc810a 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.export.resolver;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.export.ExportItem;
@@ -47,6 +48,15 @@ public class DataSourceResolver extends AbstractResolver<SpDataStream> {
     getNoSqlStore().getDataStreamStorage().createElement(deserializeDocument(document));
   }
 
+  public void writeDocument(String document,
+                            boolean overrideDocument) throws JsonProcessingException {
+    var dataStream = deserializeDocument(document);
+    if (overrideDocument) {
+      EventGroundingProcessor.applyOverride(dataStream.getEventGrounding().getTransportProtocol());
+    }
+    getNoSqlStore().getDataStreamStorage().createElement(dataStream);
+  }
+
   @Override
   protected SpDataStream deserializeDocument(String document) throws JsonProcessingException {
     return this.spMapper.readValue(document, SpDataStream.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
index 5ca897458..173289f0c 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
@@ -19,10 +19,14 @@
 package org.apache.streampipes.export.resolver;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
 import org.apache.streampipes.export.utils.SerializationUtils;
+import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.export.ExportItem;
 import org.apache.streampipes.model.pipeline.Pipeline;
 
+import java.util.stream.Collectors;
+
 public class PipelineResolver extends AbstractResolver<Pipeline> {
 
   @Override
@@ -31,6 +35,12 @@ public class PipelineResolver extends AbstractResolver<Pipeline> {
     doc.setRev(null);
     doc.setRestartOnSystemReboot(false);
     doc.setRunning(false);
+    doc.setSepas(doc.getSepas().stream().peek(s -> s.setSelectedEndpointUrl(null)).collect(Collectors.toList()));
+    doc.setActions(doc.getActions().stream().peek(s -> s.setSelectedEndpointUrl(null)).collect(Collectors.toList()));
+    doc.setStreams(doc.getStreams()
+      .stream()
+      .filter(s -> s instanceof SpDataSet).peek(s -> ((SpDataSet) s).setSelectedEndpointUrl(null))
+      .collect(Collectors.toList()));
     return doc;
   }
 
@@ -49,6 +59,27 @@ public class PipelineResolver extends AbstractResolver<Pipeline> {
     getNoSqlStore().getPipelineStorageAPI().storePipeline(deserializeDocument(document));
   }
 
+  public void writeDocument(String document,
+                            boolean overrideDocument) throws JsonProcessingException {
+    var pipeline = deserializeDocument(document);
+    if (overrideDocument) {
+      pipeline.setSepas(pipeline.getSepas().stream().peek(processor -> {
+        processor.getInputStreams().forEach(is -> EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+        EventGroundingProcessor.applyOverride(processor.getOutputStream().getEventGrounding().getTransportProtocol());
+      }).collect(Collectors.toList()));
+
+      pipeline.setStreams(pipeline.getStreams().stream().peek(stream -> {
+        EventGroundingProcessor.applyOverride(stream.getEventGrounding().getTransportProtocol());
+      }).collect(Collectors.toList()));
+
+      pipeline.setActions(pipeline.getActions().stream().peek(sink -> {
+        sink.getInputStreams().forEach(is -> EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+      }).collect(Collectors.toList()));
+
+    }
+    getNoSqlStore().getPipelineStorageAPI().storePipeline(pipeline);
+  }
+
   @Override
   protected Pipeline deserializeDocument(String document) throws JsonProcessingException {
     return this.spMapper.readValue(document, Pipeline.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
new file mode 100644
index 000000000..15578de8e
--- /dev/null
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.export.utils;
+
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+public class EventGroundingProcessor {
+
+  public static void applyOverride(TransportProtocol protocol) {
+    if (protocol instanceof KafkaTransportProtocol) {
+      protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
+      ((KafkaTransportProtocol) protocol).setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
+    } else if (protocol instanceof MqttTransportProtocol) {
+      protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
+      ((MqttTransportProtocol) protocol).setPort(BackendConfig.INSTANCE.getMqttPort());
+    }
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
index 525bfae1d..bd1e94625 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
@@ -35,6 +35,8 @@ public class AssetExportConfiguration {
   private Set<ExportItem> pipelines;
   private Set<ExportItem> files;
 
+  private boolean overrideBrokerSettings;
+
   public AssetExportConfiguration() {
     this.adapters = new HashSet<>();
     this.dashboards = new HashSet<>();
@@ -157,4 +159,12 @@ public class AssetExportConfiguration {
   public void addAsset(ExportItem asset) {
     this.assets.add(asset);
   }
+
+  public boolean isOverrideBrokerSettings() {
+    return overrideBrokerSettings;
+  }
+
+  public void setOverrideBrokerSettings(boolean overrideBrokerSettings) {
+    this.overrideBrokerSettings = overrideBrokerSettings;
+  }
 }
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 88c0b73ca..2708db878 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -18,7 +18,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-08-17 14:48:34.
+// Generated using typescript-generator version 2.27.744 on 2022-08-19 14:38:41.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -615,6 +615,7 @@ export class AssetExportConfiguration {
     dataSources: ExportItem[];
     dataViews: ExportItem[];
     files: ExportItem[];
+    overrideBrokerSettings: boolean;
     pipelines: ExportItem[];
 
     static fromData(data: AssetExportConfiguration, target?: AssetExportConfiguration): AssetExportConfiguration {
@@ -632,6 +633,7 @@ export class AssetExportConfiguration {
         instance.dataSources = __getCopyArrayFn(ExportItem.fromData)(data.dataSources);
         instance.pipelines = __getCopyArrayFn(ExportItem.fromData)(data.pipelines);
         instance.files = __getCopyArrayFn(ExportItem.fromData)(data.files);
+        instance.overrideBrokerSettings = data.overrideBrokerSettings;
         return instance;
     }
 }
@@ -2639,9 +2641,9 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
         const instance = target || new PipelineTemplateDescription();
         super.fromData(data, instance);
         instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
-        instance.pipelineTemplateId = data.pipelineTemplateId;
         instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
         instance.pipelineTemplateName = data.pipelineTemplateName;
+        instance.pipelineTemplateId = data.pipelineTemplateId;
         return instance;
     }
 }
diff --git a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
index 7be850b07..3dbfec26e 100644
--- a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
+++ b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
@@ -18,7 +18,7 @@
 
 <div class="sp-dialog-container">
     <div class="sp-dialog-content">
-        <div fxFlex="100" fxLayout="column" class="p-15" *ngIf="preview">
+        <div fxFlex="100" fxLayout="column" class="p-15" *ngIf="preview && !exportInProgress">
             <div *ngFor="let config of preview.assetExportConfiguration">
                 <h4>Exported items {{config.assetName}}</h4>
                 <sp-data-export-item [exportItems]="config.adapters" sectionTitle="Adapters"></sp-data-export-item>
@@ -30,6 +30,12 @@
                 <sp-data-export-item [exportItems]="config.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
             </div>
         </div>
+        <div fxFlex="100" fxLayout="column" *ngIf="exportInProgress" class="mt-10" fxLayoutAlign="center center">
+            <mat-spinner [diameter]="50" color="accent"></mat-spinner>
+            <h4 class="mt-10">Exporting resources...</h4>
+            <h5 class="mt-10">Depending on number and size of exported files, this might take a while...</h5>
+        </div>
+
     </div>
     <mat-divider></mat-divider>
     <div class="sp-dialog-actions">
diff --git a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
index d5ac02de0..df8d7d80c 100644
--- a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
+++ b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
@@ -32,6 +32,7 @@ export class SpDataExportDialogComponent implements OnInit {
   selectedAssets: string[];
 
   preview: ExportConfiguration;
+  exportInProgress = false;
 
   constructor(private dialogRef: DialogRef<SpDataExportDialogComponent>,
               private dataExportService: DataExportService) {
@@ -49,6 +50,7 @@ export class SpDataExportDialogComponent implements OnInit {
   }
 
   generateDownloadPackage(): void {
+    this.exportInProgress = true;
     this.dataExportService.triggerExport(this.preview).subscribe(result => {
       this.downloadFile(result);
     });
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
index 01bc6c817..630b7cb63 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
@@ -53,7 +53,19 @@
             <sp-data-export-item [exportItems]="importConfiguration.dataLakeMeasures" sectionTitle="Data Lake Storage"></sp-data-export-item>
             <sp-data-export-item [exportItems]="importConfiguration.files" sectionTitle="Files"></sp-data-export-item>
             <sp-data-export-item [exportItems]="importConfiguration.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
+
+            <div fxFlex="100" fxLayout="column" *ngIf="currentImportStep === 1" class="mt-10">
+                <h4>Import options</h4>
+                <div fxLayout="column"*ngIf="importConfiguration">
+                    <mat-checkbox [(ngModel)]="importConfiguration.overrideBrokerSettings">Use broker settings from this instance</mat-checkbox>
+                </div>
+            </div>
         </div>
+        <div fxFlex="100" fxLayout="column" *ngIf="currentImportStep === 2" class="mt-10" fxLayoutAlign="center center">
+            <mat-spinner [diameter]="50" color="accent"></mat-spinner>
+            <h4 class="mt-10">Importing resources...</h4>
+        </div>
+
     </div>
     <mat-divider></mat-divider>
     <div class="sp-dialog-actions">
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
index f8da6c2ac..ea40c28a1 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
@@ -66,6 +66,7 @@ export class SpDataImportDialogComponent implements OnInit {
             this.uploadStatus = Math.round(100 * event.loaded / event.total);
           } else if (event instanceof HttpResponse) {
            this.importConfiguration = event.body as AssetExportConfiguration;
+           this.importConfiguration.overrideBrokerSettings = true;
            this.currentImportStep++;
           }
         },
@@ -76,6 +77,7 @@ export class SpDataImportDialogComponent implements OnInit {
   }
 
   performImport(): void {
+    this.currentImportStep = 2;
     this.dataExportService.triggerImport(this.selectedUploadFile, this.importConfiguration).subscribe(result => {
       this.dialogRef.close();
     });