You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/15 18:51:05 UTC

[incubator-streampipes-extensions] branch dev updated: [STREAMPIPES-217] Update SDK methods to reflect changes in file management

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0ba9698  [STREAMPIPES-217] Update SDK methods to reflect changes in file management
0ba9698 is described below

commit 0ba969831f700edb46361c52893171a19ce588bd
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Sep 15 20:45:55 2020 +0200

    [STREAMPIPES-217] Update SDK methods to reflect changes in file management
---
 .../connect/adapters/image/ImageZipAdapter.java    | 11 +--
 .../adapters/image/ZipFileImageIterator.java       | 48 +++++++----
 .../adapters/image/set/ImageSetAdapter.java        |  3 +-
 .../adapters/image/stream/ImageStreamAdapter.java  |  3 +-
 .../connect/adapters/plc4x/s7/Plc4xS7Adapter.java  | 93 ++++++++++------------
 .../connect/protocol/set/FileProtocol.java         | 67 +++++++---------
 .../protocol/stream/FileStreamProtocol.java        | 71 +++++++----------
 .../CsvMetadataEnrichmentController.java           | 11 +--
 8 files changed, 140 insertions(+), 167 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
index 0c5b44e..7bda6c3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
@@ -22,13 +22,11 @@ import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -55,16 +53,13 @@ public class ImageZipAdapter {
      */
     public void start(AdapterPipeline adapterPipeline, boolean infinite) throws AdapterException {
         StaticPropertyExtractor extractor =
-                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
-
-        FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ImageZipUtils.ZIP_FILE_KEY);
-        String fileUri = fileStaticProperty.getLocationPath();
+                StaticPropertyExtractor.from(adapterDescription.getConfig());
 
         Integer timeBetweenReplay = extractor.singleValueParameter(ImageZipUtils.INTERVAL_KEY, Integer.class);
-
+        String zipFileUrl = extractor.selectedFileFetchUrl(ImageZipUtils.ZIP_FILE_KEY);
         ZipFileImageIterator zipFileImageIterator;
         try {
-            zipFileImageIterator = new ZipFileImageIterator(fileUri, infinite);
+            zipFileImageIterator = new ZipFileImageIterator(zipFileUrl, infinite);
         } catch (IOException e) {
             throw new AdapterException("Error while reading images in the zip file");
         }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
index 4434362..cd08441 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
@@ -18,37 +18,37 @@
 
 package org.apache.streampipes.connect.adapters.image;
 
-import org.apache.commons.io.IOUtils;
+import org.apache.http.client.fluent.Request;
 
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Enumeration;
 import java.util.List;
 import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
 
 public class ZipFileImageIterator {
-    private ZipFile zipFile;
-    private List<ZipEntry> allImages;
+
+    private static final int BUFFER_SIZE = 4096;
+
+    private List<byte[]> allImages;
     private int current;
 
     /* Defines whether the iterator starts from the beginning or not */
     private boolean infinite;
 
-    public ZipFileImageIterator(String zipFileRoute, boolean infinite) throws IOException {
-        this.zipFile = new ZipFile(zipFileRoute);
+    public ZipFileImageIterator(String zipFileUrl, boolean infinite) throws IOException {
+        ZipInputStream inputStream = fetchZipInputStream(zipFileUrl);
         this.infinite = infinite;
 
-        Enumeration<? extends ZipEntry> entries = zipFile.entries();
-
+        ZipEntry entry;
         this.allImages = new ArrayList<>();
 
-        while(entries.hasMoreElements()){
-            ZipEntry entry = entries.nextElement();
+        while((entry = inputStream.getNextEntry()) != null) {
             if (isImage(entry.getName())){
-                allImages.add(entry);
+                allImages.add(extractFile(inputStream));
             }
         }
         this.current = 0;
@@ -68,9 +68,7 @@ public class ZipFileImageIterator {
             }
         }
 
-        ZipEntry entry = allImages.get(current);
-        InputStream stream = zipFile.getInputStream(entry);
-        byte[] bytes = IOUtils.toByteArray(stream);
+        byte[] bytes = allImages.get(current);
 
         current++;
         String resultImage = Base64.getEncoder().encodeToString(bytes);
@@ -84,4 +82,22 @@ public class ZipFileImageIterator {
                         name.toLowerCase().endsWith(".jpeg"));
 
     }
+
+    private ZipInputStream fetchZipInputStream(String fileFetchUrl) throws IOException {
+        return new ZipInputStream(Request.Get(fileFetchUrl).execute().returnContent().asStream());
+    }
+
+    private byte[] extractFile(ZipInputStream zipIn) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        BufferedOutputStream bos = new BufferedOutputStream(outputStream);
+        byte[] bytesIn = new byte[BUFFER_SIZE];
+        int read = 0;
+        while ((read = zipIn.read(bytesIn)) != -1) {
+            bos.write(bytesIn, 0, read);
+        }
+        bos.close();
+        byte[] bytes = outputStream.toByteArray();
+        outputStream.close();
+        return bytes;
+    }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
index a4f68f8..dc7e0d8 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/set/ImageSetAdapter.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescriptio
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataSetAdapterBuilder;
+import org.apache.streampipes.sdk.helpers.Filetypes;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
@@ -59,7 +60,7 @@ public class ImageSetAdapter extends SpecificDataSetAdapter {
                 .withLocales(Locales.EN)
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
                 .requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
-                .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
+                .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY), Filetypes.ZIP)
                 .build();
         description.setAppId(ID);
 
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
index 77be1be..9336e81 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescrip
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.helpers.Filetypes;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
@@ -58,7 +59,7 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
                 .withLocales(Locales.EN)
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
                 .requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
-                .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
+                .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY), Filetypes.ZIP)
                 .build();
         description.setAppId(ID);
 
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
index e27cfaa..bdad565 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/plc4x/s7/Plc4xS7Adapter.java
@@ -18,11 +18,12 @@
 
 package org.apache.streampipes.connect.adapters.plc4x.s7;
 
-import com.poiji.bind.Poiji;
 import com.opencsv.CSVReader;
 import com.opencsv.bean.CsvToBean;
 import com.opencsv.bean.CsvToBeanBuilder;
 import com.opencsv.bean.HeaderColumnNameTranslateMappingStrategy;
+import com.poiji.bind.Poiji;
+import com.poiji.exception.PoijiExcelType;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -33,38 +34,33 @@ import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.util.PollingSettings;
 import org.apache.streampipes.connect.adapters.PullAdapter;
-import org.apache.streampipes.connect.protocol.stream.KafkaProtocol;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
-import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.io.File;
 
 public class Plc4xS7Adapter extends PullAdapter {
 
@@ -136,9 +132,9 @@ public class Plc4xS7Adapter extends PullAdapter {
                                         StaticProperties.singleValueSelection(Labels.withId(PLC_NODE_TYPE),
                                                 Options.from("Bool",  "Byte", "Int", "Word", "Real")))),
                         Alternatives.from(Labels.withId(CSV_IMPORT),
-                                StaticProperties.fileProperty(Labels.withId(PLC_NODES_CSV_FILE))),
+                                StaticProperties.fileProperty(Labels.withId(PLC_NODES_CSV_FILE), Filetypes.CSV)),
                         Alternatives.from(Labels.withId(EXCEL_IMPORT),
-                                StaticProperties.fileProperty(Labels.withId(PLC_NODES_EXCEL_FILE))))
+                                StaticProperties.fileProperty(Labels.withId(PLC_NODES_EXCEL_FILE), Filetypes.XLSX, Filetypes.XLS)))
                 .build();
         description.setAppId(ID);
 
@@ -240,12 +236,9 @@ public class Plc4xS7Adapter extends PullAdapter {
 
             // publish the final event
             adapterPipeline.process(event);
-        } catch (InterruptedException e) {
-            LOG.error(e.getMessage());
-        } catch (ExecutionException e) {
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error(e.getMessage());
         }
-
     }
 
     /**
@@ -291,35 +284,23 @@ public class Plc4xS7Adapter extends PullAdapter {
         String selectedAlternative = extractor.selectedAlternativeInternalId(CONFIGURE);
         if (selectedAlternative.equals(CSV_IMPORT)) {
             // CSV file
-            FileStaticProperty sp = (FileStaticProperty) extractor.getStaticPropertyByName(PLC_NODES_CSV_FILE);
-            this.nodes = new ArrayList<>();
             try {
-                List<S7ConfigFile> configFiles = this.getCsvConfig(sp.getLocationPath());
-                for (S7ConfigFile entry : configFiles) {
-                    Map map = new HashMap();
-                    map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
-                    map.put(PLC_NODE_NAME, entry.getLogicalAddress());
-                    map.put(PLC_NODE_TYPE, entry.getDataType());
-                    this.nodes.add(map);
-                }
-            } catch (FileNotFoundException e) {
+                String csvFileContent = extractor.fileContentsAsString(PLC_NODES_CSV_FILE);
+                List<S7ConfigFile> configFiles = this.getCsvConfig(csvFileContent);
+                this.nodes = makeConfigMap(configFiles);
+            } catch (IOException e) {
                 throw new AdapterException("Could not read imported file");
             }
 
         } else if (selectedAlternative.equals(EXCEL_IMPORT)) {
             // Excel file
-            FileStaticProperty sp = (FileStaticProperty) extractor.getStaticPropertyByName(PLC_NODES_EXCEL_FILE);
-            this.nodes = new ArrayList<>();
             try {
-                List<S7ConfigFile> configFiles = this.getExcelConfig(sp.getLocationPath());
-                for (S7ConfigFile entry : configFiles) {
-                    Map map = new HashMap();
-                    map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
-                    map.put(PLC_NODE_NAME, entry.getLogicalAddress());
-                    map.put(PLC_NODE_TYPE, entry.getDataType());
-                    this.nodes.add(map);
-                }
-            } catch (FileNotFoundException e) {
+                InputStream is = extractor.fileContentsAsStream(PLC_NODES_EXCEL_FILE);
+                String excelFilename = extractor.selectedFilename(PLC_NODES_CSV_FILE);
+                List<S7ConfigFile> configFiles = this.getExcelConfig(is, excelFilename);
+                this.nodes = makeConfigMap(configFiles);
+                is.close();
+            } catch (IOException e) {
                 throw new AdapterException("Could not read imported file");
             }
 
@@ -331,7 +312,7 @@ public class Plc4xS7Adapter extends PullAdapter {
             for (StaticProperty member : sp.getMembers()) {
                 StaticPropertyExtractor memberExtractor =
                         StaticPropertyExtractor.from(((StaticPropertyGroup) member).getStaticProperties(), new ArrayList<>());
-                Map map = new HashMap();
+                Map<String, String> map = new HashMap<>();
                 map.put(PLC_NODE_RUNTIME_NAME, memberExtractor.textParameter(PLC_NODE_RUNTIME_NAME));
                 map.put(PLC_NODE_NAME, memberExtractor.textParameter(PLC_NODE_NAME));
                 map.put(PLC_NODE_TYPE, memberExtractor.selectedSingleValue(PLC_NODE_TYPE, String.class));
@@ -340,35 +321,43 @@ public class Plc4xS7Adapter extends PullAdapter {
         }
     }
 
+    private List<Map<String, String>> makeConfigMap(List<S7ConfigFile> configFiles) {
+        List<Map<String, String>> nodes = new ArrayList<>();
+        for (S7ConfigFile entry : configFiles) {
+            Map<String, String> map = new HashMap<>();
+            map.put(PLC_NODE_RUNTIME_NAME, entry.getName());
+            map.put(PLC_NODE_NAME, entry.getLogicalAddress());
+            map.put(PLC_NODE_TYPE, entry.getDataType());
+            nodes.add(map);
+        }
 
-    private List<S7ConfigFile> getExcelConfig(String path) throws FileNotFoundException {
-        List<S7ConfigFile> configFiles = Poiji.fromExcel(new File(path), S7ConfigFile.class);
-        return configFiles;
+        return nodes;
     }
 
-    private List<S7ConfigFile> getCsvConfig(String path) throws FileNotFoundException {
+    private List<S7ConfigFile> getExcelConfig(InputStream is, String excelFilename) {
+        PoijiExcelType excelType = excelFilename.endsWith("xlsx") ? PoijiExcelType.XLSX : PoijiExcelType.XLS;
+        return Poiji.fromExcel(is, excelType, S7ConfigFile.class);
+    }
 
-        FileReader fr = new FileReader(path);
-        CSVReader reader = new CSVReader(fr, ';');
+    private List<S7ConfigFile> getCsvConfig(String fileContents) {
+        CSVReader reader = new CSVReader(new StringReader(fileContents), ';');
 
-        Map<String, String> mapping = new
-                HashMap<String, String>();
+        Map<String, String> mapping = new HashMap<>();
         mapping.put("Name", "name");
         mapping.put("Logical Address", "logicalAddress");
         mapping.put("Data Type", "dataType");
 
-        HeaderColumnNameTranslateMappingStrategy strategy =
-                new HeaderColumnNameTranslateMappingStrategy();
+        HeaderColumnNameTranslateMappingStrategy<S7ConfigFile> strategy =
+                new HeaderColumnNameTranslateMappingStrategy<>();
         strategy.setType(S7ConfigFile.class);
         strategy.setColumnMapping(mapping);
 
-        CsvToBean<S7ConfigFile> csvToBean = new CsvToBeanBuilder(reader)
+        CsvToBean<S7ConfigFile> csvToBean = new CsvToBeanBuilder<S7ConfigFile>(reader)
                 .withType(S7ConfigFile.class)
                 .withMappingStrategy(strategy)
                 .build();
 
-        List<S7ConfigFile> configFiles = csvToBean.parse();
-        return configFiles;
+        return csvToBean.parse();
     }
 
     /**
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
index 3684623..062ef49 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/set/FileProtocol.java
@@ -19,10 +19,7 @@
 package org.apache.streampipes.connect.protocol.set;
 
 
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
@@ -30,17 +27,24 @@ import org.apache.streampipes.connect.adapter.model.generic.Format;
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.model.generic.Protocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Filetypes;
 import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -51,14 +55,14 @@ public class FileProtocol extends Protocol {
 
     public static final String ID = "org.apache.streampipes.protocol.set.file";
 
-    private String fileUri;
+    private String fileFetchUrl;
 
     public FileProtocol() {
     }
 
-    public FileProtocol(Parser parser, Format format, String fileUri) {
+    public FileProtocol(Parser parser, Format format, String fileFetchUrl) {
         super(parser, format);
-        this.fileUri = fileUri;
+        this.fileFetchUrl = fileFetchUrl;
     }
 
     @Override
@@ -68,18 +72,16 @@ public class FileProtocol extends Protocol {
                 .withLocales(Locales.EN)
                 .sourceType(AdapterSourceType.SET)
                 .category(AdapterType.Generic)
-                .requiredFile(Labels.withId("filePath"))
+                .requiredFile(Labels.withId("filePath"), Filetypes.XML, Filetypes.JSON, Filetypes.CSV)
                 .build();
     }
 
     @Override
     public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
-        ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig());
-
-        FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName("filePath");
+        StaticPropertyExtractor extractor = StaticPropertyExtractor.from(protocolDescription.getConfig());
 
-        String fileUri = fileStaticProperty.getLocationPath();
-        return new FileProtocol(parser, format, fileUri);
+        String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
+        return new FileProtocol(parser, format, fileFetchUrl);
     }
 
     @Override
@@ -95,18 +97,11 @@ public class FileProtocol extends Protocol {
 
         SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
         try {
-            fr = new FileReader(fileUri);
-            BufferedReader br = new BufferedReader(fr);
-
-            InputStream inn = new FileInputStream(fileUri);
-            parser.parse(inn, stk);
-
-            fr.close();
-        } catch (FileNotFoundException e) {
-            e.printStackTrace();
+            InputStream in = Request.Get(fileFetchUrl).execute().returnContent().asStream();;
+            parser.parse(in, stk);
         } catch (IOException e) {
             e.printStackTrace();
-        }  catch (ParseException e) {
+        } catch (ParseException e) {
             logger.error("Error while parsing: " + e.getMessage());
         }
     }
@@ -152,25 +147,17 @@ public class FileProtocol extends Protocol {
 //        return result;
     }
 
-
     public InputStream getDataFromEndpoint() throws ParseException {
-        FileReader fr = null;
-        InputStream inn = null;
-
         try {
-            fr = new FileReader(fileUri);
-            BufferedReader br = new BufferedReader(fr);
-
-            inn = new FileInputStream(fileUri);
-
+            return Request.Get(fileFetchUrl).execute().returnContent().asStream();
         } catch (FileNotFoundException e) {
-            throw new ParseException("File not found: " + fileUri);
+            throw new ParseException("File not found: " + fileFetchUrl);
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw new ParseException("Could not receive Data from file: " + fileFetchUrl);
         }
-        if (inn == null)
-            throw new ParseException("Could not receive Data from file: " + fileUri);
-
-        return inn;
     }
+
     @Override
     public List<Map<String, Object>> getNElements(int n) throws ParseException {
         List<Map<String, Object>> result = new ArrayList<>();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
index d29b2dc..bfbdb56 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/FileStreamProtocol.java
@@ -18,10 +18,7 @@
 
 package org.apache.streampipes.connect.protocol.stream;
 
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
@@ -31,19 +28,23 @@ import org.apache.streampipes.connect.adapter.model.generic.Protocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaReplayAdapterSink;
-import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.*;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
-import org.apache.streampipes.sdk.helpers.AdapterSourceType;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 public class FileStreamProtocol extends Protocol {
 
@@ -51,7 +52,8 @@ public class FileStreamProtocol extends Protocol {
 
   public static final String ID = "org.apache.streampipes.connect.protocol.stream.file";
 
-  private String filePath;
+  //private String filePath;
+  private String fileFetchUrl;
  // private String timestampKey;
   private boolean replaceTimestamp;
   private float speedUp;
@@ -64,10 +66,10 @@ public class FileStreamProtocol extends Protocol {
   public FileStreamProtocol() {
   }
 
-  public FileStreamProtocol(Parser parser, Format format, String filePath,
+  public FileStreamProtocol(Parser parser, Format format, String fileFetchUrl,
                             boolean replaceTimestamp, float speedUp, int timeBetweenReplay) {
     super(parser, format);
-    this.filePath = filePath;
+    this.fileFetchUrl = fileFetchUrl;
     this.replaceTimestamp = replaceTimestamp;
     this.speedUp = speedUp;
     this.timeBetweenReplay = timeBetweenReplay;
@@ -88,10 +90,10 @@ public class FileStreamProtocol extends Protocol {
                     replaceTimestamp, speedUp));
             format.reset();
             SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
-            InputStream data = getDataFromEndpoint();
+            InputStream dataInputStream = getDataFromEndpoint();
             try {
-              if(data != null) {
-                parser.parse(data, stk);
+              if(dataInputStream != null) {
+                parser.parse(dataInputStream, stk);
               } else {
                 logger.warn("Could not read data from file.");
               }
@@ -116,43 +118,28 @@ public class FileStreamProtocol extends Protocol {
     running = false;
   }
 
-  InputStream getDataFromEndpoint() throws ParseException {
-    FileReader fr = null;
-    InputStream inn = null;
+  private InputStream getDataFromEndpoint() throws ParseException {
     try {
-
-      fr = new FileReader(filePath);
-      BufferedReader br = new BufferedReader(fr);
-
-      inn = new FileInputStream(filePath);
-
-    } catch (FileNotFoundException e) {
-        throw new ParseException("Could not find file: " + filePath);
+      return Request.Get(fileFetchUrl).execute().returnContent().asStream();
+    } catch (IOException e) {
+      throw new ParseException("Could not find file: " + fileFetchUrl);
     }
-
-    if (inn == null)
-        throw new ParseException("Could not receive Data from file: " + filePath);
-
-
-    return inn;
   }
 
   @Override
   public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
-    ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig());
+    StaticPropertyExtractor extractor = StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());
 
-    List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp");
+    List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
 //    String replaceTimestampString = extractor.selectedSingleValueOption("replaceTimestamp");
     boolean replaceTimestamp = replaceTimestampStringList.size() == 0 ? false : true;
 
-    float speedUp = Float.parseFloat(extractor.singleValue("speed"));
+    float speedUp = extractor.singleValueParameter("speed", Float.class);
 
     int timeBetweenReplay = 1;
 
-    FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName("filePath");
-
-    String fileUri = fileStaticProperty.getLocationPath();
-    return new FileStreamProtocol(parser, format, fileUri, replaceTimestamp, speedUp, timeBetweenReplay);
+    String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
+    return new FileStreamProtocol(parser, format, fileFetchUrl, replaceTimestamp, speedUp, timeBetweenReplay);
   }
 
   private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
@@ -185,7 +172,7 @@ public class FileStreamProtocol extends Protocol {
             .withLocales(Locales.EN)
             .sourceType(AdapterSourceType.STREAM)
             .category(AdapterType.Generic)
-            .requiredFile(Labels.withId("filePath"))
+            .requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
 //            .requiredSingleValueSelection(Labels.withId("replaceTimestamp"),
 //                Options.from("True", "False"))
             .requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
index f7e47f9..a0ea2ba 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
@@ -17,8 +17,6 @@
  */
 package org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata;
 
-import static org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
-
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -34,10 +32,7 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
@@ -49,6 +44,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
+
 public class CsvMetadataEnrichmentController
         extends StandaloneEventProcessingDeclarer<CsvMetadataEnrichmentParameters>
         implements ResolvesContainerProvidedOptions,
@@ -71,7 +68,7 @@ public class CsvMetadataEnrichmentController
                             Labels.withId(MAPPING_FIELD_KEY),
                             PropertyScope.NONE)
                     .build())
-            .requiredFile(Labels.withId(CSV_FILE_KEY))
+            .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
             .requiredSingleValueSelectionFromContainer(Labels.withId(FIELD_TO_MATCH),
                     Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY))
             .requiredMultiValueSelectionFromContainer(Labels.withId(FIELDS_TO_APPEND_KEY),