You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/18 20:54:41 UTC

[streampipes] 01/01: [#1121] Add replay once option to FileStreamProtocol

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1121
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ecbc43024fe96f390b8bd8f3fc4f96d7967bb068
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Wed Jan 18 21:54:27 2023 +0100

    [#1121] Add replay once option to FileStreamProtocol
---
 .../iiot/protocol/stream/FileStreamProtocol.java   | 65 ++++++++++++++--------
 .../strings.en                                     |  3 +
 2 files changed, 46 insertions(+), 22 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 4b588f882..ba6e2a5d2 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -54,6 +54,12 @@ import java.util.concurrent.TimeUnit;
 
 public class FileStreamProtocol extends Protocol {
 
+  private static final String REPLACE_TIMESTAMP = "replaceTimestamp";
+  private static final String SPEED = "speed";
+  private static final String FILE_PATH = "filePath";
+  private static final String REPLAY_ONCE = "replayOnce";
+
+
   private static final Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class);
 
   public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";
@@ -61,6 +67,8 @@ public class FileStreamProtocol extends Protocol {
   private String selectedFileName;
   private boolean replaceTimestamp;
   private float speedUp;
+
+  private boolean replayOnce;
   private int timeBetweenReplay;
 
   private ScheduledExecutorService executor;
@@ -73,32 +81,45 @@ public class FileStreamProtocol extends Protocol {
                             String selectedFileName,
                             boolean replaceTimestamp,
                             float speedUp,
-                            int timeBetweenReplay) {
+                            int timeBetweenReplay,
+                            boolean replayOnce) {
     super(parser, format);
     this.selectedFileName = selectedFileName;
     this.replaceTimestamp = replaceTimestamp;
     this.speedUp = speedUp;
     this.timeBetweenReplay = timeBetweenReplay;
+    this.replayOnce = replayOnce;
   }
 
   @Override
   public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
     String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());
 
-
     executor = Executors.newScheduledThreadPool(1);
     var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey);
 
-    executor.scheduleAtFixedRate(() -> {
-      try (InputStream dataInputStream = getDataFromEndpoint()) {
-        format.reset();
-        parser.parse(dataInputStream, eventProcessor);
-      } catch (ParseException | IOException e) {
-        logger.error("Error while parsing: " + e.getMessage());
-      }
-    }, 0, timeBetweenReplay, TimeUnit.SECONDS);
+    if (replayOnce) {
+      executor.schedule(() -> processFileInput(eventProcessor),
+          0,
+          TimeUnit.SECONDS);
+    } else {
+      executor.scheduleAtFixedRate(() -> processFileInput(eventProcessor),
+          0,
+          timeBetweenReplay,
+          TimeUnit.SECONDS);
+    }
   }
 
+  private void processFileInput(LocalEventProcessor eventProcessor) {
+    try (InputStream dataInputStream = getDataFromEndpoint()) {
+      format.reset();
+      parser.parse(dataInputStream, eventProcessor);
+    } catch (ParseException | IOException e) {
+      logger.error("Error while parsing: " + e.getMessage());
+    }
+  }
+
+
   private class LocalEventProcessor implements EmitBinaryEvent {
 
     private final IAdapterPipeline adapterPipeline;
@@ -167,18 +188,17 @@ public class FileStreamProtocol extends Protocol {
 
   @Override
   public Protocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) {
-    StaticPropertyExtractor extractor =
+    var extractor =
         StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());
 
-    List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
-    boolean replaceTimestamp = replaceTimestampStringList.size() != 0;
-
-    float speedUp = extractor.singleValueParameter("speed", Float.class);
-
-    int timeBetweenReplay = 1;
+    var replaceTimestampStringList = extractor.selectedMultiValues(REPLACE_TIMESTAMP, String.class);
+    var replaceTimestamp = replaceTimestampStringList.size() != 0;
+    var speedUp = extractor.singleValueParameter(SPEED, Float.class);
+    var timeBetweenReplay = 1;
+    var fileName = extractor.selectedFilename(FILE_PATH);
+    var replayOnce = extractor.selectedSingleValue(REPLAY_ONCE, String.class).equals("yes");
 
-    String fileName = extractor.selectedFilename("filePath");
-    return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
+    return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay, replayOnce);
   }
 
   private String getTimestampKey(EventSchema eventSchema) throws AdapterException {
@@ -198,10 +218,11 @@ public class FileStreamProtocol extends Protocol {
         .withLocales(Locales.EN)
         .sourceType(AdapterSourceType.STREAM)
         .category(AdapterType.Generic)
-        .requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
-        .requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
+        .requiredFile(Labels.withId(FILE_PATH), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
+        .requiredMultiValueSelection(Labels.withId(REPLACE_TIMESTAMP),
             Options.from(""))
-        .requiredFloatParameter(Labels.withId("speed"))
+        .requiredSingleValueSelection(Labels.withId(REPLAY_ONCE), Options.from("no", "yes"))
+        .requiredFloatParameter(Labels.withId(SPEED))
         .build();
   }
 
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
index 597753f3f..15a9eb247 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
@@ -28,3 +28,6 @@ replaceTimestamp.description=Replace Event Time with Current Timestamp
 speed.title=Replay Speed
 speed.description=original = 1; speedup 2x = 2; half speed  =  0.5
 
+replayOnce.title=Replay Once
+replayOnce.description='yes' file is only replayed once, 'no' the file is replayed till adapter is stopped
+