You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/20 19:42:30 UTC

[streampipes] branch SP-1133 updated: Minor improvements to code style (#1133)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch SP-1133
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/SP-1133 by this push:
     new 5ad116436 Minor improvements to code style (#1133)
5ad116436 is described below

commit 5ad11643625fe3f954adf2747b6ebe37a7b27238
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 20 20:42:17 2023 +0100

    Minor improvements to code style (#1133)
---
 .../standalone/function/StreamPipesFunction.java      | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 8f79526e2..eb0a49c27 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -65,12 +65,15 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
   @Override
   public void invokeRuntime(String serviceGroup) {
     var functionId = this.getFunctionConfig().getFunctionId();
-    this.outputCollectors = this.getOutputCollectors();
-    this.outputCollectors.forEach((key, value) -> value.connect());
 
-    FunctionContext context = new FunctionContextGenerator(
-        functionId.getId(), serviceGroup, this.requiredStreamIds(), this.outputCollectors)
-        .generate();
+    this.initializeProducers();
+
+    var context = new FunctionContextGenerator(
+        functionId.getId(),
+        serviceGroup,
+        this.requiredStreamIds(),
+        this.outputCollectors
+    ).generate();
 
     // Creates a source info for each incoming SpDataStream
     // The index is used to create the selector prefix for the SourceInfo
@@ -87,6 +90,7 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
     this.inputCollectors = getInputCollectors(context.getStreams());
 
     LOG.info("Invoking function {}:{}", functionId.getId(), functionId.getVersion());
+
     onServiceStarted(context);
     registerConsumers();
   }
@@ -134,6 +138,11 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
         SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e)));
   }
 
+  private void initializeProducers() {
+    this.outputCollectors = this.getOutputCollectors();
+    this.outputCollectors.forEach((key, value) -> value.connect());
+  }
+
   private Map<String, SpOutputCollector> getOutputCollectors() {
     this.getFunctionConfig().getOutputDataStreams().forEach((key, value) -> {
       this.outputCollectors.put(