You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/18 05:47:09 UTC

[incubator-streampipes-extensions] branch STREAMPIPES-130 updated: Finished ImageStreamAdapter

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-130
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/STREAMPIPES-130 by this push:
     new 06da435  Finished ImageStreamAdapter
06da435 is described below

commit 06da435f30f581e09652fb002ca197bfd44454ce
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon May 18 07:46:36 2020 +0200

    Finished ImageStreamAdapter
---
 .../streampipes/connect/ConnectAdapterInit.java    |   2 +
 .../connect/adapters/image/ImageZipAdapter.java    | 104 +++++++++++++++++++++
 .../connect/adapters/image/ImageZipUtils.java      |  32 +++++++
 .../adapters/image/ZipFileImageIterator.java       |  30 +++---
 .../adapters/image/stream/ImageStreamAdapter.java  |  40 ++------
 5 files changed, 164 insertions(+), 44 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index ff9e279..ba24adb 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect;
 
+import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
 import org.apache.streampipes.connect.adapters.iss.IssAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
@@ -86,6 +87,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new NetioRestAdapter())
             .add(new NetioMQTTAdapter())
             .add(new Plc4xS7Adapter())
+            .add(new ImageStreamAdapter())
             .add(new IssAdapter());
 
     String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
new file mode 100644
index 0000000..c2a89ea
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
@@ -0,0 +1,104 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.connect.adapters.image;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.staticproperty.FileStaticProperty;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImageZipAdapter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ImageStreamAdapter.class);
+
+    private AdapterDescription adapterDescription;
+
+    /* Controls whether the adapter is still running or not */
+    private boolean running;
+
+    private Thread task;
+
+    public ImageZipAdapter(AdapterDescription adapterDescription)  {
+        this.adapterDescription = adapterDescription;
+    }
+
+    /**
+     * First extracts the user input and then starts a thread publishing events with images in the zip file
+     * @param adapterPipeline is used to pre-process and publish events on message broker
+     * @param infinite Describes if the replay should be restarted when it is finished or not
+     * @throws AdapterException
+     */
+    public void start(AdapterPipeline adapterPipeline, boolean infinite) throws AdapterException {
+        StaticPropertyExtractor extractor =
+                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+        FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ImageZipUtils.ZIP_FILE_KEY);
+        String fileUri = fileStaticProperty.getLocationPath();
+
+        Integer timeBetweenReplay = extractor.singleValueParameter(ImageZipUtils.INTERVAL_KEY, Integer.class);
+
+        ZipFileImageIterator zipFileImageIterator;
+        try {
+            zipFileImageIterator = new ZipFileImageIterator(fileUri, infinite);
+        } catch (IOException e) {
+            throw new AdapterException("Error while reading images in the zip file");
+        }
+
+        running = true;
+
+        task = new Thread(() -> {
+            while (running) {
+
+                try {
+                    String image = zipFileImageIterator.next();
+
+                    Map<String, Object> result = new HashMap<>();
+                    result.put(ImageZipUtils.TIMESTAMP, System.currentTimeMillis());
+                    result.put(ImageZipUtils.IMAGE, image);
+                    adapterPipeline.process(result);
+                } catch (IOException e) {
+                    LOG.error("Error while reading an image from the zip file " + e.getMessage());
+                }
+
+                try {
+                    Thread.sleep(timeBetweenReplay);
+                } catch (InterruptedException e) {
+                    LOG.error("Error while waiting for next replay round" + e.getMessage());
+                }
+            }
+        });
+        task.start();
+    }
+
+    /**
+     * Stops the running thread that publishes the images
+     */
+    public void stop() {
+        task.interrupt();
+        running = false;
+    }
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java
new file mode 100644
index 0000000..c051ef8
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java
@@ -0,0 +1,32 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.connect.adapters.image;
+
+public class ImageZipUtils {
+
+    /* Key for the static property defining the interval */
+    public static final String INTERVAL_KEY = "interval-key";
+
+    /* Key for the file static property referencing the zip file */
+    public static final String ZIP_FILE_KEY = "zip-file-key";
+
+    /* Runtime name for the timestamp event property */
+    public static final String TIMESTAMP = "timestamp";
+
+    /* Runtime name for the image event property */
+    public static final String IMAGE = "image";
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
index a424300..f2d3755 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
@@ -32,8 +32,12 @@ public class ZipFileImageIterator {
     private List<ZipEntry> allImages;
     private int current;
 
-    public ZipFileImageIterator(String zipFileRoute) throws IOException {
+    /* Defines whether the iterator starts from the beginning or not */
+    private boolean infinite;
+
+    public ZipFileImageIterator(String zipFileRoute, boolean infinite) throws IOException {
         this.zipFile = new ZipFile(zipFileRoute);
+        this.infinite = infinite;
 
         Enumeration<? extends ZipEntry> entries = zipFile.entries();
 
@@ -45,32 +49,30 @@ public class ZipFileImageIterator {
                 allImages.add(entry);
             }
         }
-        current = 0;
+        this.current = 0;
 
     }
 
     public boolean hasNext() {
-        return current < this.allImages.size();
+        return infinite || current < this.allImages.size();
     }
 
     public String next() throws IOException {
+
+        // Reset the current file counter when infinite is true and iterator is at the end
+        if (infinite) {
+            if (current >= this.allImages.size()) {
+                this.current = 0;
+            }
+        }
+
         ZipEntry entry = allImages.get(current);
         InputStream stream = zipFile.getInputStream(entry);
         byte[] bytes = IOUtils.toByteArray(stream);
 
         current++;
         String resultImage = Base64.getEncoder().encodeToString(bytes);
-        return entry.getName();
-    }
-
-    public static void main(String... args) throws IOException {
-        String route = "";
-        ZipFileImageIterator zipFileImageIterator = new ZipFileImageIterator(route);
-
-        while (zipFileImageIterator.hasNext()) {
-            System.out.println(zipFileImageIterator.next());
-        }
-
+        return resultImage;
     }
 
     private static boolean isImage(String name) {
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
index f365e8a..fb3aead 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
@@ -16,32 +16,22 @@ limitations under the License.
 
 package org.apache.streampipes.connect.adapters.image.stream;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.iss.IssAdapter;
-import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.connect.adapters.image.ImageZipAdapter;
+import org.apache.streampipes.connect.adapters.image.ImageZipUtils;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.vocabulary.Geo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
 import static org.apache.streampipes.sdk.helpers.EpProperties.*;
 
 public class ImageStreamAdapter extends SpecificDataStreamAdapter {
@@ -50,11 +40,7 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
 
     public static final String ID = "org.apache.streampipes.connect.adapters.image.stream";
 
-    private static final String INTERVAL_KEY = "interval-key";
-    private static final String ZIP_FILE_KEY = "zip-file-key";
-
-    private static final String Timestamp = "timestamp";
-    private static final String Image = "image";
+    private ImageZipAdapter imageZipAdapter;
 
     public ImageStreamAdapter() {
 
@@ -69,8 +55,8 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
         SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
                 .withLocales(Locales.EN)
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .requiredIntegerParameter(Labels.withId(INTERVAL_KEY))
-                .requiredFile(Labels.withId(ZIP_FILE_KEY))
+                .requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
+                .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
                 .build();
         description.setAppId(ID);
 
@@ -79,26 +65,20 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
 
     @Override
     public void startAdapter() throws AdapterException {
-        StaticPropertyExtractor extractor =
-                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
-        FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ZIP_FILE_KEY);
-
-        String fileUri = fileStaticProperty.getLocationPath();
-
+        imageZipAdapter = new ImageZipAdapter(adapterDescription);
+        imageZipAdapter.start(adapterPipeline, true);
     }
 
-
-
     @Override
     public void stopAdapter() throws AdapterException {
-
+        imageZipAdapter.stop();
     }
 
     @Override
     public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
         return GuessSchemaBuilder.create()
-                .property(timestampProperty(Timestamp))
-                .property(imageProperty("image"))
+                .property(timestampProperty(ImageZipUtils.TIMESTAMP))
+                .property(imageProperty(ImageZipUtils.IMAGE))
                 .build();
     }