You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/19 13:58:18 UTC
[incubator-streampipes] branch rel/0.70.0 updated: [STREAMPIPES-565] Support broker settings override when importing resources
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/rel/0.70.0 by this push:
new 15808fee1 [STREAMPIPES-565] Support broker settings override when importing resources
15808fee1 is described below
commit 15808fee127c87611d11d677fc29e8a5c1593ea6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Aug 19 15:58:07 2022 +0200
[STREAMPIPES-565] Support broker settings override when importing resources
---
.../export/dataimport/PerformImportGenerator.java | 6 ++--
.../export/resolver/AdapterResolver.java | 11 +++++++
.../export/resolver/DataSourceResolver.java | 10 ++++++
.../export/resolver/PipelineResolver.java | 31 ++++++++++++++++++
.../export/utils/EventGroundingProcessor.java | 37 ++++++++++++++++++++++
.../model/export/AssetExportConfiguration.java | 10 ++++++
.../src/lib/model/gen/streampipes-model.ts | 6 ++--
.../data-export-dialog.component.html | 8 ++++-
.../export-dialog/data-export-dialog.component.ts | 2 ++
.../data-import-dialog.component.html | 12 +++++++
.../import-dialog/data-import-dialog.component.ts | 2 ++
11 files changed, 129 insertions(+), 6 deletions(-)
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
index 0c66afc3e..f4345f946 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
@@ -61,7 +61,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
@Override
protected void handleAdapter(String document, String adapterId) throws JsonProcessingException {
if (shouldStore(adapterId, config.getAdapters())) {
- new AdapterResolver().writeDocument(document);
+ new AdapterResolver().writeDocument(document, config.isOverrideBrokerSettings());
permissionsToStore.add(new PermissionInfo(adapterId, AdapterDescription.class));
}
}
@@ -85,7 +85,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
@Override
protected void handleDataSource(String document, String dataSourceId) throws JsonProcessingException {
if (shouldStore(dataSourceId, config.getDataSources())) {
- new DataSourceResolver().writeDocument(document);
+ new DataSourceResolver().writeDocument(document, config.isOverrideBrokerSettings());
permissionsToStore.add(new PermissionInfo(dataSourceId, SpDataStream.class));
}
}
@@ -93,7 +93,7 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
@Override
protected void handlePipeline(String document, String pipelineId) throws JsonProcessingException {
if (shouldStore(pipelineId, config.getPipelines())) {
- new PipelineResolver().writeDocument(document);
+ new PipelineResolver().writeDocument(document, config.isOverrideBrokerSettings());
permissionsToStore.add(new PermissionInfo(pipelineId, Pipeline.class));
}
}
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index 2598d9dc3..c879d982c 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -20,6 +20,7 @@
package org.apache.streampipes.export.resolver;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
@@ -31,6 +32,7 @@ public class AdapterResolver extends AbstractResolver<AdapterDescription> {
public AdapterDescription findDocument(String resourceId) {
var doc = getNoSqlStore().getAdapterInstanceStorage().getAdapter(resourceId);
doc.setRev(null);
+ doc.setSelectedEndpointUrl(null);
if (doc instanceof AdapterStreamDescription) {
((AdapterStreamDescription) doc).setRunning(false);
}
@@ -52,6 +54,15 @@ public class AdapterResolver extends AbstractResolver<AdapterDescription> {
getNoSqlStore().getAdapterInstanceStorage().storeAdapter(deserializeDocument(document));
}
+ public void writeDocument(String document,
+ boolean overrideDocument) throws JsonProcessingException {
+ var adapterDescription = deserializeDocument(document);
+ if (overrideDocument) {
+ EventGroundingProcessor.applyOverride(adapterDescription.getEventGrounding().getTransportProtocol());
+ }
+ getNoSqlStore().getAdapterInstanceStorage().storeAdapter(adapterDescription);
+ }
+
@Override
protected AdapterDescription deserializeDocument(String document) throws JsonProcessingException {
return this.spMapper.readValue(document, AdapterDescription.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
index 6a79f3d7c..a38bc810a 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/DataSourceResolver.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.export.resolver;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.export.ExportItem;
@@ -47,6 +48,15 @@ public class DataSourceResolver extends AbstractResolver<SpDataStream> {
getNoSqlStore().getDataStreamStorage().createElement(deserializeDocument(document));
}
+ public void writeDocument(String document,
+ boolean overrideDocument) throws JsonProcessingException {
+ var dataStream = deserializeDocument(document);
+ if (overrideDocument) {
+ EventGroundingProcessor.applyOverride(dataStream.getEventGrounding().getTransportProtocol());
+ }
+ getNoSqlStore().getDataStreamStorage().createElement(dataStream);
+ }
+
@Override
protected SpDataStream deserializeDocument(String document) throws JsonProcessingException {
return this.spMapper.readValue(document, SpDataStream.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
index 5ca897458..173289f0c 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/PipelineResolver.java
@@ -19,10 +19,14 @@
package org.apache.streampipes.export.resolver;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.EventGroundingProcessor;
import org.apache.streampipes.export.utils.SerializationUtils;
+import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.export.ExportItem;
import org.apache.streampipes.model.pipeline.Pipeline;
+import java.util.stream.Collectors;
+
public class PipelineResolver extends AbstractResolver<Pipeline> {
@Override
@@ -31,6 +35,12 @@ public class PipelineResolver extends AbstractResolver<Pipeline> {
doc.setRev(null);
doc.setRestartOnSystemReboot(false);
doc.setRunning(false);
+ doc.setSepas(doc.getSepas().stream().peek(s -> s.setSelectedEndpointUrl(null)).collect(Collectors.toList()));
+ doc.setActions(doc.getActions().stream().peek(s -> s.setSelectedEndpointUrl(null)).collect(Collectors.toList()));
+ doc.setStreams(doc.getStreams()
+ .stream()
+ .filter(s -> s instanceof SpDataSet).peek(s -> ((SpDataSet) s).setSelectedEndpointUrl(null))
+ .collect(Collectors.toList()));
return doc;
}
@@ -49,6 +59,27 @@ public class PipelineResolver extends AbstractResolver<Pipeline> {
getNoSqlStore().getPipelineStorageAPI().storePipeline(deserializeDocument(document));
}
+ public void writeDocument(String document,
+ boolean overrideDocument) throws JsonProcessingException {
+ var pipeline = deserializeDocument(document);
+ if (overrideDocument) {
+ pipeline.setSepas(pipeline.getSepas().stream().peek(processor -> {
+ processor.getInputStreams().forEach(is -> EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+ EventGroundingProcessor.applyOverride(processor.getOutputStream().getEventGrounding().getTransportProtocol());
+ }).collect(Collectors.toList()));
+
+ pipeline.setStreams(pipeline.getStreams().stream().peek(stream -> {
+ EventGroundingProcessor.applyOverride(stream.getEventGrounding().getTransportProtocol());
+ }).collect(Collectors.toList()));
+
+ pipeline.setActions(pipeline.getActions().stream().peek(sink -> {
+ sink.getInputStreams().forEach(is -> EventGroundingProcessor.applyOverride(is.getEventGrounding().getTransportProtocol()));
+ }).collect(Collectors.toList()));
+
+ }
+ getNoSqlStore().getPipelineStorageAPI().storePipeline(pipeline);
+ }
+
@Override
protected Pipeline deserializeDocument(String document) throws JsonProcessingException {
return this.spMapper.readValue(document, Pipeline.class);
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
new file mode 100644
index 000000000..15578de8e
--- /dev/null
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/EventGroundingProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.export.utils;
+
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+public class EventGroundingProcessor {
+
+ public static void applyOverride(TransportProtocol protocol) {
+ if (protocol instanceof KafkaTransportProtocol) {
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getKafkaHost());
+ ((KafkaTransportProtocol) protocol).setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
+ } else if (protocol instanceof MqttTransportProtocol) {
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getMqttHost());
+ ((MqttTransportProtocol) protocol).setPort(BackendConfig.INSTANCE.getMqttPort());
+ }
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
index 525bfae1d..bd1e94625 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/export/AssetExportConfiguration.java
@@ -35,6 +35,8 @@ public class AssetExportConfiguration {
private Set<ExportItem> pipelines;
private Set<ExportItem> files;
+ private boolean overrideBrokerSettings;
+
public AssetExportConfiguration() {
this.adapters = new HashSet<>();
this.dashboards = new HashSet<>();
@@ -157,4 +159,12 @@ public class AssetExportConfiguration {
public void addAsset(ExportItem asset) {
this.assets.add(asset);
}
+
+ public boolean isOverrideBrokerSettings() {
+ return overrideBrokerSettings;
+ }
+
+ public void setOverrideBrokerSettings(boolean overrideBrokerSettings) {
+ this.overrideBrokerSettings = overrideBrokerSettings;
+ }
}
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 88c0b73ca..2708db878 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -18,7 +18,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-08-17 14:48:34.
+// Generated using typescript-generator version 2.27.744 on 2022-08-19 14:38:41.
export class AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -615,6 +615,7 @@ export class AssetExportConfiguration {
dataSources: ExportItem[];
dataViews: ExportItem[];
files: ExportItem[];
+ overrideBrokerSettings: boolean;
pipelines: ExportItem[];
static fromData(data: AssetExportConfiguration, target?: AssetExportConfiguration): AssetExportConfiguration {
@@ -632,6 +633,7 @@ export class AssetExportConfiguration {
instance.dataSources = __getCopyArrayFn(ExportItem.fromData)(data.dataSources);
instance.pipelines = __getCopyArrayFn(ExportItem.fromData)(data.pipelines);
instance.files = __getCopyArrayFn(ExportItem.fromData)(data.files);
+ instance.overrideBrokerSettings = data.overrideBrokerSettings;
return instance;
}
}
@@ -2639,9 +2641,9 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
const instance = target || new PipelineTemplateDescription();
super.fromData(data, instance);
instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
- instance.pipelineTemplateId = data.pipelineTemplateId;
instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
instance.pipelineTemplateName = data.pipelineTemplateName;
+ instance.pipelineTemplateId = data.pipelineTemplateId;
return instance;
}
}
diff --git a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
index 7be850b07..3dbfec26e 100644
--- a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
+++ b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
@@ -18,7 +18,7 @@
<div class="sp-dialog-container">
<div class="sp-dialog-content">
- <div fxFlex="100" fxLayout="column" class="p-15" *ngIf="preview">
+ <div fxFlex="100" fxLayout="column" class="p-15" *ngIf="preview && !exportInProgress">
<div *ngFor="let config of preview.assetExportConfiguration">
<h4>Exported items {{config.assetName}}</h4>
<sp-data-export-item [exportItems]="config.adapters" sectionTitle="Adapters"></sp-data-export-item>
@@ -30,6 +30,12 @@
<sp-data-export-item [exportItems]="config.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
</div>
</div>
+ <div fxFlex="100" fxLayout="column" *ngIf="exportInProgress" class="mt-10" fxLayoutAlign="center center">
+ <mat-spinner [diameter]="50" color="accent"></mat-spinner>
+ <h4 class="mt-10">Exporting resources...</h4>
+ <h5 class="mt-10">Depending on number and size of exported files, this might take a while...</h5>
+ </div>
+
</div>
<mat-divider></mat-divider>
<div class="sp-dialog-actions">
diff --git a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
index d5ac02de0..df8d7d80c 100644
--- a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
+++ b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.ts
@@ -32,6 +32,7 @@ export class SpDataExportDialogComponent implements OnInit {
selectedAssets: string[];
preview: ExportConfiguration;
+ exportInProgress = false;
constructor(private dialogRef: DialogRef<SpDataExportDialogComponent>,
private dataExportService: DataExportService) {
@@ -49,6 +50,7 @@ export class SpDataExportDialogComponent implements OnInit {
}
generateDownloadPackage(): void {
+ this.exportInProgress = true;
this.dataExportService.triggerExport(this.preview).subscribe(result => {
this.downloadFile(result);
});
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
index 01bc6c817..630b7cb63 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
@@ -53,7 +53,19 @@
<sp-data-export-item [exportItems]="importConfiguration.dataLakeMeasures" sectionTitle="Data Lake Storage"></sp-data-export-item>
<sp-data-export-item [exportItems]="importConfiguration.files" sectionTitle="Files"></sp-data-export-item>
<sp-data-export-item [exportItems]="importConfiguration.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
+
+ <div fxFlex="100" fxLayout="column" *ngIf="currentImportStep === 1" class="mt-10">
+ <h4>Import options</h4>
+ <div fxLayout="column"*ngIf="importConfiguration">
+ <mat-checkbox [(ngModel)]="importConfiguration.overrideBrokerSettings">Use broker settings from this instance</mat-checkbox>
+ </div>
+ </div>
</div>
+ <div fxFlex="100" fxLayout="column" *ngIf="currentImportStep === 2" class="mt-10" fxLayoutAlign="center center">
+ <mat-spinner [diameter]="50" color="accent"></mat-spinner>
+ <h4 class="mt-10">Importing resources...</h4>
+ </div>
+
</div>
<mat-divider></mat-divider>
<div class="sp-dialog-actions">
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
index f8da6c2ac..ea40c28a1 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
@@ -66,6 +66,7 @@ export class SpDataImportDialogComponent implements OnInit {
this.uploadStatus = Math.round(100 * event.loaded / event.total);
} else if (event instanceof HttpResponse) {
this.importConfiguration = event.body as AssetExportConfiguration;
+ this.importConfiguration.overrideBrokerSettings = true;
this.currentImportStep++;
}
},
@@ -76,6 +77,7 @@ export class SpDataImportDialogComponent implements OnInit {
}
performImport(): void {
+ this.currentImportStep = 2;
this.dataExportService.triggerImport(this.selectedUploadFile, this.importConfiguration).subscribe(result => {
this.dialogRef.close();
});