You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/20 19:42:30 UTC
[streampipes] branch SP-1133 updated: Minor improvements to code style (#1133)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch SP-1133
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/SP-1133 by this push:
new 5ad116436 Minor improvements to code style (#1133)
5ad116436 is described below
commit 5ad11643625fe3f954adf2747b6ebe37a7b27238
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 20 20:42:17 2023 +0100
Minor improvements to code style (#1133)
---
.../standalone/function/StreamPipesFunction.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 8f79526e2..eb0a49c27 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -65,12 +65,15 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
@Override
public void invokeRuntime(String serviceGroup) {
var functionId = this.getFunctionConfig().getFunctionId();
- this.outputCollectors = this.getOutputCollectors();
- this.outputCollectors.forEach((key, value) -> value.connect());
- FunctionContext context = new FunctionContextGenerator(
- functionId.getId(), serviceGroup, this.requiredStreamIds(), this.outputCollectors)
- .generate();
+ this.initializeProducers();
+
+ var context = new FunctionContextGenerator(
+ functionId.getId(),
+ serviceGroup,
+ this.requiredStreamIds(),
+ this.outputCollectors
+ ).generate();
// Creates a source info for each incoming SpDataStream
// The index is used to create the selector prefix for the SourceInfo
@@ -87,6 +90,7 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
this.inputCollectors = getInputCollectors(context.getStreams());
LOG.info("Invoking function {}:{}", functionId.getId(), functionId.getVersion());
+
onServiceStarted(context);
registerConsumers();
}
@@ -134,6 +138,11 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e)));
}
+ private void initializeProducers() {
+ this.outputCollectors = this.getOutputCollectors();
+ this.outputCollectors.forEach((key, value) -> value.connect());
+ }
+
private Map<String, SpOutputCollector> getOutputCollectors() {
this.getFunctionConfig().getOutputDataStreams().forEach((key, value) -> {
this.outputCollectors.put(