You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/15 18:51:05 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-217] Update SDK methods to reflect changes in file management
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 0ba9698 [STREAMPIPES-217] Update SDK methods to reflect changes in file management
0ba9698 is described below
commit 0ba969831f700edb46361c52893171a19ce588bd
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Sep 15 20:45:55 2020 +0200
[STREAMPIPES-217] Update SDK methods to reflect changes in file management
---
.../connect/adapters/image/ImageZipAdapter.java | 11 +--
.../adapters/image/ZipFileImageIterator.java | 48 +++++++----
.../adapters/image/set/ImageSetAdapter.java | 3 +-
.../adapters/image/stream/ImageStreamAdapter.java | 3 +-
.../connect/adapters/plc4x/s7/Plc4xS7Adapter.java | 93 ++++++++++------------
.../connect/protocol/set/FileProtocol.java | 67 +++++++---------
.../protocol/stream/FileStreamProtocol.java | 71 +++++++----------
.../CsvMetadataEnrichmentController.java | 11 +--
8 files changed, 140 insertions(+), 167 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
index 0c5b44e..7bda6c3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
@@ -22,13 +22,11 @@ import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -55,16 +53,13 @@ public class ImageZipAdapter {
*/
public void start(AdapterPipeline adapterPipeline, boolean infinite) throws AdapterException {
StaticPropertyExtractor extractor =
- StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
-
- FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ImageZipUtils.ZIP_FILE_KEY);
- String fileUri = fileStaticProperty.getLocationPath();
+ StaticPropertyExtractor.from(adapterDescription.getConfig());
Integer timeBetweenReplay = extractor.singleValueParameter(ImageZipUtils.INTERVAL_KEY, Integer.class);
-
+ String zipFileUrl = extractor.selectedFileFetchUrl(ImageZipUtils.ZIP_FILE_KEY);
ZipFileImageIterator zipFileImageIterator;
try {
- zipFileImageIterator = new ZipFileImageIterator(fileUri, infinite);
+ zipFileImageIterator = new ZipFileImageIterator(zipFileUrl, infinite);
} catch (IOException e) {
throw new AdapterException("Error while reading images in the zip file");
}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
index 4434362..cd08441 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
@@ -18,37 +18,37 @@
package org.apache.streampipes.connect.adapters.image;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.client.fluent.Request;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Enumeration;
import java.util.List;
import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
public class ZipFileImageIterator {
- private ZipFile zipFile;
- private List<ZipEntry> allImages;
+
+ private static final int BUFFER_SIZE = 4096;
+
+ private List<byte[]> allImages;
private int current;
/* Defines whether the iterator starts from the beginning or not */
private boolean infinite;
- public ZipFileImageIterator(String zipFileRoute, boolean infinite) throws IOException {
- this.zipFile = new ZipFile(zipFileRoute);
+ public ZipFileImageIterator(String zipFileUrl, boolean infinite) throws IOException {
+ ZipInputStream inputStream = fetchZipInputStream(zipFileUrl);
this.infinite = infinite;
- Enumeration<? extends ZipEntry> entries = zipFile.entries();
-
+ ZipEntry entry;
this.allImages = new ArrayList<>();
- while(entries.hasMoreElements()){
- ZipEntry entry = entries.nextElement();
+ while((entry = inputStream.getNextEntry()) != null) {
if (isImage(entry.getName())){
- allImages.add(entry);
+ allImages.add(extractFile(inputStream));
}
}
this.current = 0;
@@ -68,9 +68,7 @@ public class ZipFileImageIterator {
}
}
- ZipEntry entry = allImages.get(current);
- InputStream stream = zipFile.getInputStream(entry);
- byte[] bytes = IOUtils.toByteArray(stream);
+ byte[] bytes = allImages.get(current);
current++;
String resultImage = Base64.getEncoder().encodeToString(bytes);
@@ -84,4 +82,22 @@ public class ZipFileImageIterator {
name.toLowerCase().endsWith(".jpeg"));
}
+
+ private ZipInputStream fetchZipInputStream(String fileFetchUrl) throws IOException {
+ return new ZipInputStream(Request.Get(fileFetchUrl).execute().returnContent().asStream());
+ }
+
+ private byte[] extractFile(ZipInputStream zipIn) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BufferedOutputStream bos = new BufferedOutputStream(outputStream);
+ byte[] bytesIn = new byte[BUFFER_SIZE];
+ int read = 0;
+ while ((read = zipIn.read(bytesIn)) != -1) {
+ bos.write(bytesIn, 0, read);
+ }
+ bos.close();
+ byte[] bytes = outputStream.toByteArray();
+ outputStream.close();
+ return bytes;
+ }
}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
index a4f68f8..dc7e0d8 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescriptio
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataSetAdapterBuilder;
+import org.apache.streampipes.sdk.helpers.Filetypes;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
@@ -59,7 +60,7 @@ public class ImageSetAdapter extends SpecificDataSetAdapter {
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
- .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
+ .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY), Filetypes.ZIP)
.build();
description.setAppId(ID);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
index 77be1be..9336e81 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescrip
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.helpers.Filetypes;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
@@ -58,7 +59,7 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
- .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
+ .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY), Filetypes.ZIP)
.build();
description.setAppId(ID);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
index e27cfaa..bdad565 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -18,11 +18,12 @@
package org.apache.streampipes.connect.adapters.plc4x.s7;
-import com.poiji.bind.Poiji;
import com.opencsv.CSVReader;
import com.opencsv.bean.CsvToBean;
import com.opencsv.bean.CsvToBeanBuilder;
import com.opencsv.bean.HeaderColumnNameTranslateMappingStrategy;
+import com.poiji.bind.Poiji;
+import com.poiji.exception.PoijiExcelType;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -33,38 +34,33 @@ import org.apache.streampipes.connect.adapter.Adapter;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.adapter.util.PollingSettings;
import org.apache.streampipes.connect.adapters.PullAdapter;
-import org.apache.streampipes.connect.protocol.stream.KafkaProtocol;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.io.File;
public class Plc4xS7Adapter extends PullAdapter {
@@ -136,9 +132,9 @@ public class Plc4xS7Adapter extends PullAdapter {
StaticProperties.singleValueSelection(Labels.withId(PLC_NODE_TYPE),
Options.from("Bool", "Byte", "Int", "Word", "Real")))),
Alternatives.from(Labels.withId(CSV_IMPORT),
- StaticProperties.fileProperty(Labels.withId(PLC_NODES_CSV_FILE))),
+ StaticProperties.fileProperty(Labels.withId(PLC_NODES_CSV_FILE), Filetypes.CSV)),
Alternatives.from(Labels.withId(EXCEL_IMPORT),
- StaticProperties.fileProperty(Labels.withId(PLC_NODES_EXCEL_FILE))))
+ StaticProperties.fileProperty(Labels.withId(PLC_NODES_EXCEL_FILE), Filetypes.XLSX, Filetypes.XLS)))
.build();
description.setAppId(ID);
@@ -240,12 +236,9 @@ public class Plc4xS7Adapter extends PullAdapter {
// publish the final event
adapterPipeline.process(event);
- } catch (InterruptedException e) {
- LOG.error(e.getMessage());
- } catch (ExecutionException e) {
+ } catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage());
}
-
}
/**
@@ -291,35 +284,23 @@ public class Plc4xS7Adapter extends PullAdapter {
String selectedAlternative = extractor.selectedAlternativeInternalId(CONFIGURE);
if (selectedAlternative.equals(CSV_IMPORT)) {
// CSV file
- FileStaticProperty sp = (FileStaticProperty) extractor.getStaticPropertyByName(PLC_NODES_CSV_FILE);
- this.nodes = new ArrayList<>();
try {
- List<S7ConfigFile> configFiles = this.getCsvConfig(sp.getLocationPath());
- for (S7ConfigFile entry : configFiles) {
- Map map = new HashMap();
- map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
- map.put(PLC_NODE_NAME, entry.getLogicalAddress());
- map.put(PLC_NODE_TYPE, entry.getDataType());
- this.nodes.add(map);
- }
- } catch (FileNotFoundException e) {
+ String csvFileContent = extractor.fileContentsAsString(PLC_NODES_CSV_FILE);
+ List<S7ConfigFile> configFiles = this.getCsvConfig(csvFileContent);
+ this.nodes = makeConfigMap(configFiles);
+ } catch (IOException e) {
throw new AdapterException("Could not read imported file");
}
} else if (selectedAlternative.equals(EXCEL_IMPORT)) {
// Excel file
- FileStaticProperty sp = (FileStaticProperty) extractor.getStaticPropertyByName(PLC_NODES_EXCEL_FILE);
- this.nodes = new ArrayList<>();
try {
- List<S7ConfigFile> configFiles = this.getExcelConfig(sp.getLocationPath());
- for (S7ConfigFile entry : configFiles) {
- Map map = new HashMap();
- map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
- map.put(PLC_NODE_NAME, entry.getLogicalAddress());
- map.put(PLC_NODE_TYPE, entry.getDataType());
- this.nodes.add(map);
- }
- } catch (FileNotFoundException e) {
+ InputStream is = extractor.fileContentsAsStream(PLC_NODES_EXCEL_FILE);
+ String excelFilename = extractor.selectedFilename(PLC_NODES_CSV_FILE);
+ List<S7ConfigFile> configFiles = this.getExcelConfig(is, excelFilename);
+ this.nodes = makeConfigMap(configFiles);
+ is.close();
+ } catch (IOException e) {
throw new AdapterException("Could not read imported file");
}
@@ -331,7 +312,7 @@ public class Plc4xS7Adapter extends PullAdapter {
for (StaticProperty member : sp.getMembers()) {
StaticPropertyExtractor memberExtractor =
StaticPropertyExtractor.from(((StaticPropertyGroup) member).getStaticProperties(), new ArrayList<>());
- Map map = new HashMap();
+ Map<String, String> map = new HashMap<>();
map.put(PLC_NODE_RUNTIME_NAME, memberExtractor.textParameter(PLC_NODE_RUNTIME_NAME));
map.put(PLC_NODE_NAME, memberExtractor.textParameter(PLC_NODE_NAME));
map.put(PLC_NODE_TYPE, memberExtractor.selectedSingleValue(PLC_NODE_TYPE, String.class));
@@ -340,35 +321,43 @@ public class Plc4xS7Adapter extends PullAdapter {
}
}
+ private List<Map<String, String>> makeConfigMap(List<S7ConfigFile> configFiles) {
+ List<Map<String, String>> nodes = new ArrayList<>();
+ for (S7ConfigFile entry : configFiles) {
+ Map<String, String> map = new HashMap<>();
+ map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
+ map.put(PLC_NODE_NAME, entry.getLogicalAddress());
+ map.put(PLC_NODE_TYPE, entry.getDataType());
+ nodes.add(map);
+ }
- private List<S7ConfigFile> getExcelConfig(String path) throws FileNotFoundException {
- List<S7ConfigFile> configFiles = Poiji.fromExcel(new File(path), S7ConfigFile.class);
- return configFiles;
+ return nodes;
}
- private List<S7ConfigFile> getCsvConfig(String path) throws FileNotFoundException {
+ private List<S7ConfigFile> getExcelConfig(InputStream is, String excelFilename) {
+ PoijiExcelType excelType = excelFilename.endsWith("xlsx") ? PoijiExcelType.XLSX : PoijiExcelType.XLS;
+ return Poiji.fromExcel(is, excelType, S7ConfigFile.class);
+ }
- FileReader fr = new FileReader(path);
- CSVReader reader = new CSVReader(fr, ';');
+ private List<S7ConfigFile> getCsvConfig(String fileContents) {
+ CSVReader reader = new CSVReader(new StringReader(fileContents), ';');
- Map<String, String> mapping = new
- HashMap<String, String>();
+ Map<String, String> mapping = new HashMap<>();
mapping.put("Name", "name");
mapping.put("Logical Address", "logicalAddress");
mapping.put("Data Type", "dataType");
- HeaderColumnNameTranslateMappingStrategy strategy =
- new HeaderColumnNameTranslateMappingStrategy();
+ HeaderColumnNameTranslateMappingStrategy<S7ConfigFile> strategy =
+ new HeaderColumnNameTranslateMappingStrategy<>();
strategy.setType(S7ConfigFile.class);
strategy.setColumnMapping(mapping);
- CsvToBean<S7ConfigFile> csvToBean = new CsvToBeanBuilder(reader)
+ CsvToBean<S7ConfigFile> csvToBean = new CsvToBeanBuilder<S7ConfigFile>(reader)
.withType(S7ConfigFile.class)
.withMappingStrategy(strategy)
.build();
- List<S7ConfigFile> configFiles = csvToBean.parse();
- return configFiles;
+ return csvToBean.parse();
}
/**
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
index 3684623..062ef49 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
@@ -19,10 +19,7 @@
package org.apache.streampipes.connect.protocol.set;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.http.client.fluent.Request;
import org.apache.streampipes.connect.SendToPipeline;
import org.apache.streampipes.connect.adapter.exception.ParseException;
import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
@@ -30,17 +27,24 @@ import org.apache.streampipes.connect.adapter.model.generic.Format;
import org.apache.streampipes.connect.adapter.model.generic.Parser;
import org.apache.streampipes.connect.adapter.model.generic.Protocol;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Filetypes;
import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -51,14 +55,14 @@ public class FileProtocol extends Protocol {
public static final String ID = "org.apache.streampipes.protocol.set.file";
- private String fileUri;
+ private String fileFetchUrl;
public FileProtocol() {
}
- public FileProtocol(Parser parser, Format format, String fileUri) {
+ public FileProtocol(Parser parser, Format format, String fileFetchUrl) {
super(parser, format);
- this.fileUri = fileUri;
+ this.fileFetchUrl = fileFetchUrl;
}
@Override
@@ -68,18 +72,16 @@ public class FileProtocol extends Protocol {
.withLocales(Locales.EN)
.sourceType(AdapterSourceType.SET)
.category(AdapterType.Generic)
- .requiredFile(Labels.withId("filePath"))
+ .requiredFile(Labels.withId("filePath"), Filetypes.XML, Filetypes.JSON, Filetypes.CSV)
.build();
}
@Override
public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
- ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig());
-
- FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName("filePath");
+ StaticPropertyExtractor extractor = StaticPropertyExtractor.from(protocolDescription.getConfig());
- String fileUri = fileStaticProperty.getLocationPath();
- return new FileProtocol(parser, format, fileUri);
+ String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
+ return new FileProtocol(parser, format, fileFetchUrl);
}
@Override
@@ -95,18 +97,11 @@ public class FileProtocol extends Protocol {
SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
try {
- fr = new FileReader(fileUri);
- BufferedReader br = new BufferedReader(fr);
-
- InputStream inn = new FileInputStream(fileUri);
- parser.parse(inn, stk);
-
- fr.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
+ InputStream in = Request.Get(fileFetchUrl).execute().returnContent().asStream();;
+ parser.parse(in, stk);
} catch (IOException e) {
e.printStackTrace();
- } catch (ParseException e) {
+ } catch (ParseException e) {
logger.error("Error while parsing: " + e.getMessage());
}
}
@@ -152,25 +147,17 @@ public class FileProtocol extends Protocol {
// return result;
}
-
public InputStream getDataFromEndpoint() throws ParseException {
- FileReader fr = null;
- InputStream inn = null;
-
try {
- fr = new FileReader(fileUri);
- BufferedReader br = new BufferedReader(fr);
-
- inn = new FileInputStream(fileUri);
-
+ return Request.Get(fileFetchUrl).execute().returnContent().asStream();
} catch (FileNotFoundException e) {
- throw new ParseException("File not found: " + fileUri);
+ throw new ParseException("File not found: " + fileFetchUrl);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new ParseException("Could not receive Data from file: " + fileFetchUrl);
}
- if (inn == null)
- throw new ParseException("Could not receive Data from file: " + fileUri);
-
- return inn;
}
+
@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
List<Map<String, Object>> result = new ArrayList<>();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
index d29b2dc..bfbdb56 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
@@ -18,10 +18,7 @@
package org.apache.streampipes.connect.protocol.stream;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.http.client.fluent.Request;
import org.apache.streampipes.connect.SendToPipeline;
import org.apache.streampipes.connect.adapter.exception.ParseException;
import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
@@ -31,19 +28,23 @@ import org.apache.streampipes.connect.adapter.model.generic.Protocol;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaReplayAdapterSink;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.*;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
-import org.apache.streampipes.sdk.helpers.AdapterSourceType;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
public class FileStreamProtocol extends Protocol {
@@ -51,7 +52,8 @@ public class FileStreamProtocol extends Protocol {
public static final String ID = "org.apache.streampipes.connect.protocol.stream.file";
- private String filePath;
+ //private String filePath;
+ private String fileFetchUrl;
// private String timestampKey;
private boolean replaceTimestamp;
private float speedUp;
@@ -64,10 +66,10 @@ public class FileStreamProtocol extends Protocol {
public FileStreamProtocol() {
}
- public FileStreamProtocol(Parser parser, Format format, String filePath,
+ public FileStreamProtocol(Parser parser, Format format, String fileFetchUrl,
boolean replaceTimestamp, float speedUp, int timeBetweenReplay) {
super(parser, format);
- this.filePath = filePath;
+ this.fileFetchUrl = fileFetchUrl;
this.replaceTimestamp = replaceTimestamp;
this.speedUp = speedUp;
this.timeBetweenReplay = timeBetweenReplay;
@@ -88,10 +90,10 @@ public class FileStreamProtocol extends Protocol {
replaceTimestamp, speedUp));
format.reset();
SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
- InputStream data = getDataFromEndpoint();
+ InputStream dataInputStream = getDataFromEndpoint();
try {
- if(data != null) {
- parser.parse(data, stk);
+ if(dataInputStream != null) {
+ parser.parse(dataInputStream, stk);
} else {
logger.warn("Could not read data from file.");
}
@@ -116,43 +118,28 @@ public class FileStreamProtocol extends Protocol {
running = false;
}
- InputStream getDataFromEndpoint() throws ParseException {
- FileReader fr = null;
- InputStream inn = null;
+ private InputStream getDataFromEndpoint() throws ParseException {
try {
-
- fr = new FileReader(filePath);
- BufferedReader br = new BufferedReader(fr);
-
- inn = new FileInputStream(filePath);
-
- } catch (FileNotFoundException e) {
- throw new ParseException("Could not find file: " + filePath);
+ return Request.Get(fileFetchUrl).execute().returnContent().asStream();
+ } catch (IOException e) {
+ throw new ParseException("Could not find file: " + fileFetchUrl);
}
-
- if (inn == null)
- throw new ParseException("Could not receive Data from file: " + filePath);
-
-
- return inn;
}
@Override
public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
- ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig());
+ StaticPropertyExtractor extractor = StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());
- List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp");
+ List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
// String replaceTimestampString = extractor.selectedSingleValueOption("replaceTimestamp");
boolean replaceTimestamp = replaceTimestampStringList.size() == 0 ? false : true;
- float speedUp = Float.parseFloat(extractor.singleValue("speed"));
+ float speedUp = extractor.singleValueParameter("speed", Float.class);
int timeBetweenReplay = 1;
- FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName("filePath");
-
- String fileUri = fileStaticProperty.getLocationPath();
- return new FileStreamProtocol(parser, format, fileUri, replaceTimestamp, speedUp, timeBetweenReplay);
+ String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
+ return new FileStreamProtocol(parser, format, fileFetchUrl, replaceTimestamp, speedUp, timeBetweenReplay);
}
private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
@@ -185,7 +172,7 @@ public class FileStreamProtocol extends Protocol {
.withLocales(Locales.EN)
.sourceType(AdapterSourceType.STREAM)
.category(AdapterType.Generic)
- .requiredFile(Labels.withId("filePath"))
+ .requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
// .requiredSingleValueSelection(Labels.withId("replaceTimestamp"),
// Options.from("True", "False"))
.requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
index f7e47f9..a0ea2ba 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
@@ -17,8 +17,6 @@
*/
package org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata;
-import static org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
-
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -34,10 +32,7 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
@@ -49,6 +44,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
+
public class CsvMetadataEnrichmentController
extends StandaloneEventProcessingDeclarer<CsvMetadataEnrichmentParameters>
implements ResolvesContainerProvidedOptions,
@@ -71,7 +68,7 @@ public class CsvMetadataEnrichmentController
Labels.withId(MAPPING_FIELD_KEY),
PropertyScope.NONE)
.build())
- .requiredFile(Labels.withId(CSV_FILE_KEY))
+ .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
.requiredSingleValueSelectionFromContainer(Labels.withId(FIELD_TO_MATCH),
Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY))
.requiredMultiValueSelectionFromContainer(Labels.withId(FIELDS_TO_APPEND_KEY),