You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/18 05:47:09 UTC
[incubator-streampipes-extensions] branch STREAMPIPES-130 updated:
Finished ImageStreamAdapter
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-130
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/STREAMPIPES-130 by this push:
new 06da435 Finished ImageStreamAdapter
06da435 is described below
commit 06da435f30f581e09652fb002ca197bfd44454ce
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon May 18 07:46:36 2020 +0200
Finished ImageStreamAdapter
---
.../streampipes/connect/ConnectAdapterInit.java | 2 +
.../connect/adapters/image/ImageZipAdapter.java | 104 +++++++++++++++++++++
.../connect/adapters/image/ImageZipUtils.java | 32 +++++++
.../adapters/image/ZipFileImageIterator.java | 30 +++---
.../adapters/image/stream/ImageStreamAdapter.java | 40 ++------
5 files changed, 164 insertions(+), 44 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index ff9e279..ba24adb 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect;
+import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
import org.apache.streampipes.connect.adapters.iss.IssAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
@@ -86,6 +87,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
.add(new NetioRestAdapter())
.add(new NetioMQTTAdapter())
.add(new Plc4xS7Adapter())
+ .add(new ImageStreamAdapter())
.add(new IssAdapter());
String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
new file mode 100644
index 0000000..c2a89ea
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipAdapter.java
@@ -0,0 +1,104 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.connect.adapters.image;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.staticproperty.FileStaticProperty;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImageZipAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ImageStreamAdapter.class);
+
+ private AdapterDescription adapterDescription;
+
+ /* Controls whether the adapter is still running or not */
+ private boolean running;
+
+ private Thread task;
+
+ public ImageZipAdapter(AdapterDescription adapterDescription) {
+ this.adapterDescription = adapterDescription;
+ }
+
+ /**
+ * First extracts the user input and then starts a thread publishing events with images in the zip file
+ * @param adapterPipeline is used to pre-process and publish events on message broker
+ * @param infinite Describes if the replay should be restarted when it is finished or not
+ * @throws AdapterException
+ */
+ public void start(AdapterPipeline adapterPipeline, boolean infinite) throws AdapterException {
+ StaticPropertyExtractor extractor =
+ StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+ FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ImageZipUtils.ZIP_FILE_KEY);
+ String fileUri = fileStaticProperty.getLocationPath();
+
+ Integer timeBetweenReplay = extractor.singleValueParameter(ImageZipUtils.INTERVAL_KEY, Integer.class);
+
+ ZipFileImageIterator zipFileImageIterator;
+ try {
+ zipFileImageIterator = new ZipFileImageIterator(fileUri, infinite);
+ } catch (IOException e) {
+ throw new AdapterException("Error while reading images in the zip file");
+ }
+
+ running = true;
+
+ task = new Thread(() -> {
+ while (running) {
+
+ try {
+ String image = zipFileImageIterator.next();
+
+ Map<String, Object> result = new HashMap<>();
+ result.put(ImageZipUtils.TIMESTAMP, System.currentTimeMillis());
+ result.put(ImageZipUtils.IMAGE, image);
+ adapterPipeline.process(result);
+ } catch (IOException e) {
+ LOG.error("Error while reading an image from the zip file " + e.getMessage());
+ }
+
+ try {
+ Thread.sleep(timeBetweenReplay);
+ } catch (InterruptedException e) {
+ LOG.error("Error while waiting for next replay round" + e.getMessage());
+ }
+ }
+ });
+ task.start();
+ }
+
+ /**
+ * Stops the running thread that publishes the images
+ */
+ public void stop() {
+ task.interrupt();
+ running = false;
+ }
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java
new file mode 100644
index 0000000..c051ef8
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ImageZipUtils.java
@@ -0,0 +1,32 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.connect.adapters.image;
+
+public class ImageZipUtils {
+
+ /* Key for the static property defining the interval */
+ public static final String INTERVAL_KEY = "interval-key";
+
+ /* Key for the file static property referencing the zip file */
+ public static final String ZIP_FILE_KEY = "zip-file-key";
+
+ /* Runtime name for the timestamp event property */
+ public static final String TIMESTAMP = "timestamp";
+
+ /* Runtime name for the image event property */
+ public static final String IMAGE = "image";
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
index a424300..f2d3755 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/ZipFileImageIterator.java
@@ -32,8 +32,12 @@ public class ZipFileImageIterator {
private List<ZipEntry> allImages;
private int current;
- public ZipFileImageIterator(String zipFileRoute) throws IOException {
+ /* Defines whether the iterator starts from the beginning or not */
+ private boolean infinite;
+
+ public ZipFileImageIterator(String zipFileRoute, boolean infinite) throws IOException {
this.zipFile = new ZipFile(zipFileRoute);
+ this.infinite = infinite;
Enumeration<? extends ZipEntry> entries = zipFile.entries();
@@ -45,32 +49,30 @@ public class ZipFileImageIterator {
allImages.add(entry);
}
}
- current = 0;
+ this.current = 0;
}
public boolean hasNext() {
- return current < this.allImages.size();
+ return infinite || current < this.allImages.size();
}
public String next() throws IOException {
+
+ // Reset the current file counter when infinite is true and iterator is at the end
+ if (infinite) {
+ if (current >= this.allImages.size()) {
+ this.current = 0;
+ }
+ }
+
ZipEntry entry = allImages.get(current);
InputStream stream = zipFile.getInputStream(entry);
byte[] bytes = IOUtils.toByteArray(stream);
current++;
String resultImage = Base64.getEncoder().encodeToString(bytes);
- return entry.getName();
- }
-
- public static void main(String... args) throws IOException {
- String route = "";
- ZipFileImageIterator zipFileImageIterator = new ZipFileImageIterator(route);
-
- while (zipFileImageIterator.hasNext()) {
- System.out.println(zipFileImageIterator.next());
- }
-
+ return resultImage;
}
private static boolean isImage(String name) {
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
index f365e8a..fb3aead 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/image/stream/ImageStreamAdapter.java
@@ -16,32 +16,22 @@ limitations under the License.
package org.apache.streampipes.connect.adapters.image.stream;
-import org.apache.commons.io.IOUtils;
import org.apache.streampipes.connect.adapter.Adapter;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.adapter.exception.ParseException;
import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.iss.IssAdapter;
-import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.connect.adapters.image.ImageZipAdapter;
+import org.apache.streampipes.connect.adapters.image.ImageZipUtils;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.FileStaticProperty;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.vocabulary.Geo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
import static org.apache.streampipes.sdk.helpers.EpProperties.*;
public class ImageStreamAdapter extends SpecificDataStreamAdapter {
@@ -50,11 +40,7 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
public static final String ID = "org.apache.streampipes.connect.adapters.image.stream";
- private static final String INTERVAL_KEY = "interval-key";
- private static final String ZIP_FILE_KEY = "zip-file-key";
-
- private static final String Timestamp = "timestamp";
- private static final String Image = "image";
+ private ImageZipAdapter imageZipAdapter;
public ImageStreamAdapter() {
@@ -69,8 +55,8 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredIntegerParameter(Labels.withId(INTERVAL_KEY))
- .requiredFile(Labels.withId(ZIP_FILE_KEY))
+ .requiredIntegerParameter(Labels.withId(ImageZipUtils.INTERVAL_KEY))
+ .requiredFile(Labels.withId(ImageZipUtils.ZIP_FILE_KEY))
.build();
description.setAppId(ID);
@@ -79,26 +65,20 @@ public class ImageStreamAdapter extends SpecificDataStreamAdapter {
@Override
public void startAdapter() throws AdapterException {
- StaticPropertyExtractor extractor =
- StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
- FileStaticProperty fileStaticProperty = (FileStaticProperty) extractor.getStaticPropertyByName(ZIP_FILE_KEY);
-
- String fileUri = fileStaticProperty.getLocationPath();
-
+ imageZipAdapter = new ImageZipAdapter(adapterDescription);
+ imageZipAdapter.start(adapterPipeline, true);
}
-
-
@Override
public void stopAdapter() throws AdapterException {
-
+ imageZipAdapter.stop();
}
@Override
public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
return GuessSchemaBuilder.create()
- .property(timestampProperty(Timestamp))
- .property(imageProperty("image"))
+ .property(timestampProperty(ImageZipUtils.TIMESTAMP))
+ .property(imageProperty(ImageZipUtils.IMAGE))
.build();
}