You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/08 14:15:00 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-65] Support import and export of files
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f08476979 [STREAMPIPES-65] Support import and export of files
new 2795d664e Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
f08476979 is described below
commit f08476979fb3e1f218e25ef7402d475ad27e5079
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Aug 8 16:14:32 2022 +0200
[STREAMPIPES-65] Support import and export of files
---
.../streampipes/commons/zip/ZipFileExtractor.java | 7 ++-
streampipes-data-export/pom.xml | 5 ++
.../streampipes/export/AssetLinkResolver.java | 1 +
.../export/constants/ResolvableAssetLinks.java | 1 +
.../export/dataimport/ImportGenerator.java | 36 +++++++++------
.../export/dataimport/PerformImportGenerator.java | 18 ++++++--
.../export/dataimport/PreviewImportGenerator.java | 11 ++++-
.../export/generator/ExportPackageGenerator.java | 13 ++++++
.../export/generator/ZipFileBuilder.java | 4 ++
.../streampipes/export/resolver/FileResolver.java | 54 ++++++++++++++++++++++
.../data-export-dialog.component.html | 1 +
.../data-import-dialog.component.html | 1 +
.../import-dialog/data-import-dialog.component.ts | 1 -
13 files changed, 130 insertions(+), 23 deletions(-)
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/zip/ZipFileExtractor.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/zip/ZipFileExtractor.java
index 7fdaae804..12e5af641 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/zip/ZipFileExtractor.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/zip/ZipFileExtractor.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.commons.zip;
import java.io.*;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.ZipEntry;
@@ -33,9 +32,9 @@ public class ZipFileExtractor {
}
// TODO used by export feature - extend this to support binaries
- public Map<String, String> extractZipToStringMap() throws IOException {
+ public Map<String, byte[]> extractZipToMap() throws IOException {
byte[] buffer = new byte[1024];
- Map<String, String> entries = new HashMap<>();
+ Map<String, byte[]> entries = new HashMap<>();
ZipInputStream zis = new ZipInputStream(zipInputStream);
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
@@ -44,7 +43,7 @@ public class ZipFileExtractor {
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
- entries.put(sanitizeName(zipEntry.getName()), fos.toString(StandardCharsets.UTF_8));
+ entries.put(sanitizeName(zipEntry.getName()), fos.toByteArray());
fos.close();
zipEntry = zis.getNextEntry();
}
diff --git a/streampipes-data-export/pom.xml b/streampipes-data-export/pom.xml
index ee886975d..a8cfc5545 100644
--- a/streampipes-data-export/pom.xml
+++ b/streampipes-data-export/pom.xml
@@ -34,6 +34,11 @@
<artifactId>streampipes-model</artifactId>
<version>0.70.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-management</artifactId>
+ <version>0.70.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-resource-management</artifactId>
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/AssetLinkResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/AssetLinkResolver.java
index 795c88f3e..fb241229e 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/AssetLinkResolver.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/AssetLinkResolver.java
@@ -56,6 +56,7 @@ public class AssetLinkResolver {
exportConfig.setDataSources(new DataSourceResolver().resolve(getLinks(assetLinks, ResolvableAssetLinks.DATA_SOURCE)));
exportConfig.setPipelines(new PipelineResolver().resolve(getLinks(assetLinks, ResolvableAssetLinks.PIPELINE)));
exportConfig.setDataLakeMeasures(new MeasurementResolver().resolve(getLinks(assetLinks, ResolvableAssetLinks.MEASUREMENT)));
+ exportConfig.setFiles(new FileResolver().resolve(getLinks(assetLinks, ResolvableAssetLinks.FILE)));
return exportConfig;
} catch (IOException e) {
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/constants/ResolvableAssetLinks.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/constants/ResolvableAssetLinks.java
index d3c9cd721..35588dae3 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/constants/ResolvableAssetLinks.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/constants/ResolvableAssetLinks.java
@@ -26,4 +26,5 @@ public class ResolvableAssetLinks {
public static final String ADAPTER = "adapter";
public static final String DATA_SOURCE = "data-source";
public static final String PIPELINE = "pipeline";
+ public static final String FILE = "file";
}
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/ImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/ImportGenerator.java
index 105f99fc7..d2dbe2231 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/ImportGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/ImportGenerator.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.model.export.StreamPipesApplicationPackage;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
public abstract class ImportGenerator<T> {
@@ -40,7 +41,7 @@ public abstract class ImportGenerator<T> {
}
public T generate(InputStream inputStream) throws IOException {
- Map<String, String> previewFiles = new ZipFileExtractor(inputStream).extractZipToStringMap();
+ Map<String, byte[]> previewFiles = new ZipFileExtractor(inputStream).extractZipToMap();
var manifest = getManifest(previewFiles);
@@ -49,35 +50,39 @@ public abstract class ImportGenerator<T> {
}
for (String adapterId: manifest.getAdapters()) {
- handleAdapter(previewFiles.get(adapterId), adapterId);
+ handleAdapter(asString(previewFiles.get(adapterId)), adapterId);
}
for(String dashboardId: manifest.getDashboards()) {
- handleDashboard(previewFiles.get(dashboardId), dashboardId);
+ handleDashboard(asString(previewFiles.get(dashboardId)), dashboardId);
}
for (String dataViewId: manifest.getDataViews()) {
- handleDataView(previewFiles.get(dataViewId), dataViewId);
+ handleDataView(asString(previewFiles.get(dataViewId)), dataViewId);
}
for (String dataSourceId: manifest.getDataSources()) {
- handleDataSource(previewFiles.get(dataSourceId), dataSourceId);
+ handleDataSource(asString(previewFiles.get(dataSourceId)), dataSourceId);
}
for (String pipelineId: manifest.getPipelines()) {
- handlePipeline(previewFiles.get(pipelineId), pipelineId);
+ handlePipeline(asString(previewFiles.get(pipelineId)), pipelineId);
}
for (String measurementId: manifest.getDataLakeMeasures()) {
- handleDataLakeMeasure(previewFiles.get(measurementId), measurementId);
+ handleDataLakeMeasure(asString(previewFiles.get(measurementId)), measurementId);
}
for (String dashboardWidgetId: manifest.getDashboardWidgets()) {
- handleDashboardWidget(previewFiles.get(dashboardWidgetId), dashboardWidgetId);
+ handleDashboardWidget(asString(previewFiles.get(dashboardWidgetId)), dashboardWidgetId);
}
for (String dataViewWidgetId: manifest.getDataViewWidgets()) {
- handleDataViewWidget(previewFiles.get(dataViewWidgetId), dataViewWidgetId);
+ handleDataViewWidget(asString(previewFiles.get(dataViewWidgetId)), dataViewWidgetId);
+ }
+
+ for(String fileMetadataId: manifest.getFiles()) {
+ handleFile(asString(previewFiles.get(fileMetadataId)), fileMetadataId, previewFiles);
}
afterResourcesCreated();
@@ -85,11 +90,15 @@ public abstract class ImportGenerator<T> {
return getReturnObject();
}
- private StreamPipesApplicationPackage getManifest(Map<String, String> previewFiles) throws JsonProcessingException {
- return this.defaultMapper.readValue(previewFiles.get(ExportConstants.MANIFEST), StreamPipesApplicationPackage.class);
+ protected String asString(byte[] bytes) {
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ private StreamPipesApplicationPackage getManifest(Map<String, byte[]> previewFiles) throws JsonProcessingException {
+ return this.defaultMapper.readValue(asString(previewFiles.get(ExportConstants.MANIFEST)), StreamPipesApplicationPackage.class);
}
- protected abstract void handleAsset(Map<String, String> previewFiles, String assetId) throws IOException;
+ protected abstract void handleAsset(Map<String, byte[]> previewFiles, String assetId) throws IOException;
protected abstract void handleAdapter(String document, String adapterId) throws JsonProcessingException;
protected abstract void handleDashboard(String document, String dashboardId) throws JsonProcessingException;
protected abstract void handleDataView(String document, String dataViewId) throws JsonProcessingException;
@@ -97,7 +106,8 @@ public abstract class ImportGenerator<T> {
protected abstract void handlePipeline(String document, String pipelineId) throws JsonProcessingException;
protected abstract void handleDataLakeMeasure(String document, String dataLakeMeasureId) throws JsonProcessingException;
protected abstract void handleDashboardWidget(String document, String dashboardWidgetId) throws JsonProcessingException;
- protected abstract void handleDataViewWidget(String document, String dataViewWidget) throws JsonProcessingException;
+ protected abstract void handleDataViewWidget(String document, String dataViewWidgetId) throws JsonProcessingException;
+ protected abstract void handleFile(String document, String fileMetadataId, Map<String, byte[]> zipContent) throws IOException;
protected abstract T getReturnObject();
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
index cf4769aae..0c66afc3e 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.export.dataimport;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.export.model.PermissionInfo;
import org.apache.streampipes.export.resolver.*;
+import org.apache.streampipes.manager.file.FileHandler;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.dashboard.DashboardModel;
@@ -32,6 +33,7 @@ import org.apache.streampipes.resource.management.PermissionResourceManager;
import org.apache.streampipes.storage.api.INoSqlStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
@@ -51,10 +53,9 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
this.ownerSid = ownerSid;
}
-
@Override
- protected void handleAsset(Map<String, String> previewFiles, String assetId) throws IOException {
- storage.getGenericStorage().create(previewFiles.get(assetId));
+ protected void handleAsset(Map<String, byte[]> previewFiles, String assetId) throws IOException {
+ storage.getGenericStorage().create(asString(previewFiles.get(assetId)));
}
@Override
@@ -115,6 +116,17 @@ public class PerformImportGenerator extends ImportGenerator<Void> {
new DataViewWidgetResolver().writeDocument(document);
}
+ @Override
+ protected void handleFile(String document,
+ String fileMetadataId,
+ Map<String, byte[]> zipContent) throws IOException {
+ var resolver = new FileResolver();
+ var fileMetadata = resolver.readDocument(document);
+ resolver.writeDocument(document);
+ byte[] file = zipContent.get(fileMetadata.getInternalFilename().substring(0, fileMetadata.getInternalFilename().lastIndexOf(".")));
+ new FileHandler().storeFile(fileMetadata.getInternalFilename(), new ByteArrayInputStream(file));
+ }
+
@Override
protected Void getReturnObject() {
return null;
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java
index a4ace764a..80c489424 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java
@@ -46,8 +46,8 @@ public class PreviewImportGenerator extends ImportGenerator<AssetExportConfigura
@Override
- protected void handleAsset(Map<String, String> previewFiles, String assetId) throws JsonProcessingException {
- Map<String, Object> assetDescription = this.defaultMapper.readValue(previewFiles.get(assetId), new TypeReference<Map<String, Object>>() {});
+ protected void handleAsset(Map<String, byte[]> previewFiles, String assetId) throws JsonProcessingException {
+ Map<String, Object> assetDescription = this.defaultMapper.readValue(asString(previewFiles.get(assetId)), new TypeReference<Map<String, Object>>() {});
importConfig.addAsset(new ExportItem(assetId, String.valueOf(assetDescription.get("assetName")), true));
}
@@ -91,6 +91,13 @@ public class PreviewImportGenerator extends ImportGenerator<AssetExportConfigura
}
+ @Override
+ protected void handleFile(String document,
+ String fileMetadataId,
+ Map<String, byte[]> zipContent) throws JsonProcessingException {
+ addExportItem(fileMetadataId, new FileResolver().readDocument(document).getOriginalFilename(), importConfig::addFile);
+ }
+
@Override
protected AssetExportConfiguration getReturnObject() {
return this.importConfig;
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ExportPackageGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ExportPackageGenerator.java
index 5070423ba..3350e69e8 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ExportPackageGenerator.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ExportPackageGenerator.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streampipes.export.resolver.*;
import org.apache.streampipes.export.utils.SerializationUtils;
+import org.apache.streampipes.manager.file.FileManager;
import org.apache.streampipes.model.export.AssetExportConfiguration;
import org.apache.streampipes.model.export.ExportConfiguration;
import org.apache.streampipes.model.export.ExportItem;
@@ -29,6 +30,7 @@ import org.apache.streampipes.model.export.StreamPipesApplicationPackage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -101,6 +103,17 @@ public class ExportPackageGenerator {
var widgetResolver = new DataViewWidgetResolver();
widgets.forEach(widgetId -> addDoc(builder, widgetId, widgetResolver, manifest::addDataViewWidget));
});
+
+ config.getFiles().forEach(item -> {
+ var fileResolver = new FileResolver();
+ String filename = fileResolver.findDocument(item.getResourceId()).getInternalFilename();
+ addDoc(builder, item, new FileResolver(), manifest::addFile);
+ try {
+ builder.addBinary(filename, Files.readAllBytes(FileManager.getFile(filename).toPath()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
});
builder.addManifest(defaultMapper.writeValueAsString(manifest));
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ZipFileBuilder.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ZipFileBuilder.java
index 58e8948bb..59edaa8b4 100644
--- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ZipFileBuilder.java
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/generator/ZipFileBuilder.java
@@ -86,6 +86,10 @@ public class ZipFileBuilder {
addZipEntry(documentKey + ".json", document, out, buffer);
}
+ for(String binary : this.binaryEntries.keySet()) {
+ addZipEntry(binary, this.binaryEntries.get(binary), out, buffer);
+ }
+
addZipEntry(ExportConstants.MANIFEST + ".json", asBytes(manifest), out, buffer);
out.closeEntry();
out.close();
diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/FileResolver.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/FileResolver.java
new file mode 100644
index 000000000..bd967fc90
--- /dev/null
+++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/FileResolver.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.export.resolver;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.export.utils.SerializationUtils;
+import org.apache.streampipes.model.export.ExportItem;
+import org.apache.streampipes.model.file.FileMetadata;
+
+public class FileResolver extends AbstractResolver<FileMetadata> {
+
+ @Override
+ public FileMetadata findDocument(String resourceId) {
+ var doc = getNoSqlStore().getFileMetadataStorage().getMetadataById(resourceId);
+ doc.setRev(null);
+ return doc;
+ }
+
+ @Override
+ public FileMetadata readDocument(String serializedDoc) throws JsonProcessingException {
+ return SerializationUtils.getSpObjectMapper().readValue(serializedDoc, FileMetadata.class);
+ }
+
+ @Override
+ public ExportItem convert(FileMetadata document) {
+ return new ExportItem(document.getFileId(), document.getOriginalFilename(), true);
+ }
+
+ @Override
+ public void writeDocument(String document) throws JsonProcessingException {
+ getNoSqlStore().getFileMetadataStorage().addFileMetadata(deserializeDocument(document));
+ }
+
+ @Override
+ protected FileMetadata deserializeDocument(String document) throws JsonProcessingException {
+ return SerializationUtils.getSpObjectMapper().readValue(document, FileMetadata.class);
+ }
+}
diff --git a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
index bec004e3f..7be850b07 100644
--- a/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
+++ b/ui/src/app/configuration/export/export-dialog/data-export-dialog.component.html
@@ -26,6 +26,7 @@
<sp-data-export-item [exportItems]="config.dataViews" sectionTitle="Data Views"></sp-data-export-item>
<sp-data-export-item [exportItems]="config.dataSources" sectionTitle="Data Sources"></sp-data-export-item>
<sp-data-export-item [exportItems]="config.dataLakeMeasures" sectionTitle="Data Lake Storage"></sp-data-export-item>
+ <sp-data-export-item [exportItems]="config.files" sectionTitle="Files"></sp-data-export-item>
<sp-data-export-item [exportItems]="config.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
</div>
</div>
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
index 206347f33..01bc6c817 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.html
@@ -51,6 +51,7 @@
<sp-data-export-item [exportItems]="importConfiguration.dataViews" sectionTitle="Data Views"></sp-data-export-item>
<sp-data-export-item [exportItems]="importConfiguration.dataSources" sectionTitle="Data Sources"></sp-data-export-item>
<sp-data-export-item [exportItems]="importConfiguration.dataLakeMeasures" sectionTitle="Data Lake Storage"></sp-data-export-item>
+ <sp-data-export-item [exportItems]="importConfiguration.files" sectionTitle="Files"></sp-data-export-item>
<sp-data-export-item [exportItems]="importConfiguration.pipelines" sectionTitle="Pipelines"></sp-data-export-item>
</div>
</div>
diff --git a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
index 4a8b4cd65..f8da6c2ac 100644
--- a/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
+++ b/ui/src/app/configuration/export/import-dialog/data-import-dialog.component.ts
@@ -55,7 +55,6 @@ export class SpDataImportDialogComponent implements OnInit {
this.selectedUploadFile = files[0];
this.fileName = this.selectedUploadFile.name;
this.uploadStatus = 0;
- console.log(this.hasInput);
}
performPreview(): void {