You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/18 20:54:41 UTC
[streampipes] 01/01: [#1121] Add replay once option to FileStreamProtocol
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1121
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit ecbc43024fe96f390b8bd8f3fc4f96d7967bb068
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Wed Jan 18 21:54:27 2023 +0100
[#1121] Add replay once option to FileStreamProtocol
---
.../iiot/protocol/stream/FileStreamProtocol.java | 65 ++++++++++++++--------
.../strings.en | 3 +
2 files changed, 46 insertions(+), 22 deletions(-)
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 4b588f882..ba6e2a5d2 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -54,6 +54,12 @@ import java.util.concurrent.TimeUnit;
public class FileStreamProtocol extends Protocol {
+ private static final String REPLACE_TIMESTAMP = "replaceTimestamp";
+ private static final String SPEED = "speed";
+ private static final String FILE_PATH = "filePath";
+ private static final String REPLAY_ONCE = "replayOnce";
+
+
private static final Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class);
public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";
@@ -61,6 +67,8 @@ public class FileStreamProtocol extends Protocol {
private String selectedFileName;
private boolean replaceTimestamp;
private float speedUp;
+
+ private boolean replayOnce;
private int timeBetweenReplay;
private ScheduledExecutorService executor;
@@ -73,32 +81,45 @@ public class FileStreamProtocol extends Protocol {
String selectedFileName,
boolean replaceTimestamp,
float speedUp,
- int timeBetweenReplay) {
+ int timeBetweenReplay,
+ boolean replayOnce) {
super(parser, format);
this.selectedFileName = selectedFileName;
this.replaceTimestamp = replaceTimestamp;
this.speedUp = speedUp;
this.timeBetweenReplay = timeBetweenReplay;
+ this.replayOnce = replayOnce;
}
@Override
public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());
-
executor = Executors.newScheduledThreadPool(1);
var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey);
- executor.scheduleAtFixedRate(() -> {
- try (InputStream dataInputStream = getDataFromEndpoint()) {
- format.reset();
- parser.parse(dataInputStream, eventProcessor);
- } catch (ParseException | IOException e) {
- logger.error("Error while parsing: " + e.getMessage());
- }
- }, 0, timeBetweenReplay, TimeUnit.SECONDS);
+ if (replayOnce) {
+ executor.schedule(() -> processFileInput(eventProcessor),
+ 0,
+ TimeUnit.SECONDS);
+ } else {
+ executor.scheduleAtFixedRate(() -> processFileInput(eventProcessor),
+ 0,
+ timeBetweenReplay,
+ TimeUnit.SECONDS);
+ }
}
+ private void processFileInput(LocalEventProcessor eventProcessor) {
+ try (InputStream dataInputStream = getDataFromEndpoint()) {
+ format.reset();
+ parser.parse(dataInputStream, eventProcessor);
+ } catch (ParseException | IOException e) {
+ logger.error("Error while parsing: " + e.getMessage());
+ }
+ }
+
+
private class LocalEventProcessor implements EmitBinaryEvent {
private final IAdapterPipeline adapterPipeline;
@@ -167,18 +188,17 @@ public class FileStreamProtocol extends Protocol {
@Override
public Protocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) {
- StaticPropertyExtractor extractor =
+ var extractor =
StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());
- List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class);
- boolean replaceTimestamp = replaceTimestampStringList.size() != 0;
-
- float speedUp = extractor.singleValueParameter("speed", Float.class);
-
- int timeBetweenReplay = 1;
+ var replaceTimestampStringList = extractor.selectedMultiValues(REPLACE_TIMESTAMP, String.class);
+ var replaceTimestamp = replaceTimestampStringList.size() != 0;
+ var speedUp = extractor.singleValueParameter(SPEED, Float.class);
+ var timeBetweenReplay = 1;
+ var fileName = extractor.selectedFilename(FILE_PATH);
+ var replayOnce = extractor.selectedSingleValue(REPLAY_ONCE, String.class).equals("yes");
- String fileName = extractor.selectedFilename("filePath");
- return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
+ return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay, replayOnce);
}
private String getTimestampKey(EventSchema eventSchema) throws AdapterException {
@@ -198,10 +218,11 @@ public class FileStreamProtocol extends Protocol {
.withLocales(Locales.EN)
.sourceType(AdapterSourceType.STREAM)
.category(AdapterType.Generic)
- .requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
- .requiredMultiValueSelection(Labels.withId("replaceTimestamp"),
+ .requiredFile(Labels.withId(FILE_PATH), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
+ .requiredMultiValueSelection(Labels.withId(REPLACE_TIMESTAMP),
Options.from(""))
- .requiredFloatParameter(Labels.withId("speed"))
+ .requiredSingleValueSelection(Labels.withId(REPLAY_ONCE), Options.from("no", "yes"))
+ .requiredFloatParameter(Labels.withId(SPEED))
.build();
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
index 597753f3f..15a9eb247 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en
@@ -28,3 +28,6 @@ replaceTimestamp.description=Replace Event Time with Current Timestamp
speed.title=Replay Speed
speed.description=original = 1; speedup 2x = 2; half speed = 0.5
+replayOnce.title=Replay Once
+replayOnce.description='yes' file is only replayed once, 'no' the file is replayed till adapter is stopped
+