You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/04/29 14:47:35 UTC

[streampipes] branch dev updated: Map http 404 to Java Optional.empty [#1520] (#1521)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5674efe41 Map http 404 to Java Optional.empty [#1520] (#1521)
5674efe41 is described below

commit 5674efe41ae4a67d1e2565240af373f745eb0673
Author: Stefan Obermeier <ob...@apache.org>
AuthorDate: Sat Apr 29 16:47:29 2023 +0200

    Map http 404 to Java Optional.empty [#1520] (#1521)
    
    * Add variable to set location of checkstyle base path
    
    * Fix checkstyle.config.base.path in streampipes-client
    
    * Map REST 404 to Optional.empty in Java
    
    * Return 404 if requested pipeline does not exist
---
 .../client/api/AbstractTypedClientApi.java         | 14 +++-
 .../org/apache/streampipes/client/api/CRUDApi.java |  3 +-
 .../streampipes/client/api/DataLakeMeasureApi.java |  3 +-
 .../streampipes/client/api/DataProcessorApi.java   |  3 +-
 .../apache/streampipes/client/api/DataSinkApi.java |  3 +-
 .../streampipes/client/api/DataStreamApi.java      |  3 +-
 .../apache/streampipes/client/api/PipelineApi.java |  3 +-
 .../client/api/PipelineElementTemplateApi.java     |  3 +-
 .../streampipes/client/http/HttpRequest.java       | 18 ++--
 .../commons/exceptions/SpHttpErrorStatusCode.java  | 28 ++++---
 .../shared/impl/AbstractSharedRestInterface.java   |  7 +-
 .../streampipes/rest/impl/PipelineResource.java    | 97 +++++++++-------------
 .../function/FunctionContextGenerator.java         |  5 +-
 13 files changed, 101 insertions(+), 89 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
index abd60c439..ff3964bd8 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
@@ -22,9 +22,11 @@ import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.serializer.ListSerializer;
 import org.apache.streampipes.client.serializer.ObjectSerializer;
 import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpHttpErrorStatusCode;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 
 import java.util.List;
+import java.util.Optional;
 
 public abstract class AbstractTypedClientApi<T> extends AbstractClientApi {
 
@@ -40,9 +42,17 @@ public abstract class AbstractTypedClientApi<T> extends AbstractClientApi {
     return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
   }
 
-  protected T getSingle(StreamPipesApiPath apiPath) throws SpRuntimeException {
+  protected Optional<T> getSingle(StreamPipesApiPath apiPath) throws SpRuntimeException {
     ObjectSerializer<Void, T> serializer = new ObjectSerializer<>();
-    return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
+    try {
+      return Optional.of(new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest());
+    } catch (SpHttpErrorStatusCode e) {
+      if (e.getHttpStatusCode() == 404) {
+        return Optional.empty();
+      } else {
+        throw e;
+      }
+    }
   }
 
   protected abstract StreamPipesApiPath getBaseResourcePath();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
index 6cabcc09a..cbe422185 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
@@ -18,10 +18,11 @@
 package org.apache.streampipes.client.api;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface CRUDApi<K, V> {
 
-  V get(K id);
+  Optional<V> get(K id);
 
   List<V> all();
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
index d59b4898e..702723ad4 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.client.util.StreamPipesApiPath;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
 import java.util.List;
+import java.util.Optional;
 
 public class DataLakeMeasureApi extends AbstractTypedClientApi<DataLakeMeasure>
     implements CRUDApi<String, DataLakeMeasure> {
@@ -31,7 +32,7 @@ public class DataLakeMeasureApi extends AbstractTypedClientApi<DataLakeMeasure>
     super(clientConfig, DataLakeMeasure.class);
   }
 
-  public DataLakeMeasure get(String id) {
+  public Optional<DataLakeMeasure> get(String id) {
     return getSingle(getBaseResourcePath().addToPath(id));
   }
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
index 656d5cf18..64f7ea7ee 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 
 import java.util.List;
+import java.util.Optional;
 
 public class DataProcessorApi extends AbstractTypedClientApi<DataProcessorInvocation>
     implements CRUDApi<String, DataProcessorInvocation> {
@@ -43,7 +44,7 @@ public class DataProcessorApi extends AbstractTypedClientApi<DataProcessorInvoca
   }
 
   @Override
-  public DataProcessorInvocation get(String s) {
+  public Optional<DataProcessorInvocation> get(String s) {
     return getSingle(getBaseResourcePath().addToPath(s));
   }
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
index d53d21047..694d458d6 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 
 import java.util.List;
+import java.util.Optional;
 
 public class DataSinkApi extends AbstractTypedClientApi<DataSinkInvocation>
     implements CRUDApi<String, DataSinkInvocation> {
@@ -36,7 +37,7 @@ public class DataSinkApi extends AbstractTypedClientApi<DataSinkInvocation>
   }
 
   @Override
-  public DataSinkInvocation get(String s) {
+  public Optional<DataSinkInvocation> get(String s) {
     return getSingle(getBaseResourcePath().addToPath(s));
   }
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
index d3b8d2842..dd0c3e824 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.model.message.Message;
 
 import java.net.URLEncoder;
 import java.util.List;
+import java.util.Optional;
 
 public class DataStreamApi extends AbstractTypedClientApi<SpDataStream> implements CRUDApi<String, SpDataStream> {
 
@@ -36,7 +37,7 @@ public class DataStreamApi extends AbstractTypedClientApi<SpDataStream> implemen
   }
 
   @Override
-  public SpDataStream get(String streamId) {
+  public Optional<SpDataStream> get(String streamId) {
     return getSingle(StreamPipesApiPath.fromBaseApiPath()
         .addToPath("streams").addToPath(streamId));
   }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 888c15df7..8cc180057 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
+import java.util.Optional;
 
 public class PipelineApi extends AbstractTypedClientApi<Pipeline> implements CRUDApi<String, Pipeline> {
 
@@ -34,7 +35,7 @@ public class PipelineApi extends AbstractTypedClientApi<Pipeline> implements CRU
   }
 
   @Override
-  public Pipeline get(String pipelineId) {
+  public Optional<Pipeline> get(String pipelineId) {
     return getSingle(getBaseResourcePath().addToPath(pipelineId));
   }
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
index 3038fefdd..33c9c22c5 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.client.util.StreamPipesApiPath;
 import org.apache.streampipes.model.template.PipelineElementTemplate;
 
 import java.util.List;
+import java.util.Optional;
 
 public class PipelineElementTemplateApi extends AbstractTypedClientApi<PipelineElementTemplate>
     implements CRUDApi<String, PipelineElementTemplate> {
@@ -31,7 +32,7 @@ public class PipelineElementTemplateApi extends AbstractTypedClientApi<PipelineE
   }
 
   @Override
-  public PipelineElementTemplate get(String id) {
+  public Optional<PipelineElementTemplate> get(String id) {
     return getSingle(getBaseResourcePath().addToPath(id));
   }
 
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
index b0ffc92ad..e34a1225b 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
 import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.serializer.Serializer;
 import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpHttpErrorStatusCode;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -100,14 +101,19 @@ public abstract class HttpRequest<K, V, T> {
       if (status.getStatusCode() == HttpStatus.SC_OK) {
         return afterRequest(serializer, response.getEntity());
       } else {
-        if (status.getStatusCode() == 401) {
-          throw new SpRuntimeException(
-              " 401 - Access to this resource is forbidden - did you provide a poper API key or client secret?");
-        } else {
-          throw new SpRuntimeException(status.getStatusCode() + " - " + status.getReasonPhrase());
+        switch (status.getStatusCode()) {
+          case HttpStatus.SC_UNAUTHORIZED:
+            throw new SpHttpErrorStatusCode(
+                " 401 - Access to this resource is forbidden - did you provide a poper API key or client secret?",
+                401);
+          case HttpStatus.SC_NOT_FOUND:
+            throw new SpHttpErrorStatusCode(" 404 - The requested resource could not be found.", 404);
+          default:
+            throw new SpHttpErrorStatusCode(status.getStatusCode() + " - " + status.getReasonPhrase(),
+                status.getStatusCode());
         }
       }
-    } catch (IOException | SpRuntimeException e) {
+    } catch (IOException e) {
       throw new SpRuntimeException(
           "Could not connect to the StreamPipes API - please check that StreamPipes is available", e);
     }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
similarity index 55%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
index 6cabcc09a..a4188e966 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,19 +15,25 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.client.api;
 
-import java.util.List;
+package org.apache.streampipes.commons.exceptions;
 
-public interface CRUDApi<K, V> {
+public class SpHttpErrorStatusCode extends SpRuntimeException {
 
-  V get(K id);
+  private static final long serialVersionUID = 2289470758226670270L;
+  private Integer httpStatusCode;
 
-  List<V> all();
+  /**
+   * Creates a new Exception with the given message and null as the cause.
+   *
+   * @param message The exception message
+   */
+  public SpHttpErrorStatusCode(String message, Integer httpStatusCode) {
+    super(message);
+    this.httpStatusCode = httpStatusCode;
+  }
 
-  void create(V element);
-
-  void delete(K id);
-
-  void update(V element);
+  public Integer getHttpStatusCode() {
+    return httpStatusCode;
+  }
 }
diff --git a/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/impl/AbstractSharedRestInterface.java b/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/impl/AbstractSharedRestInterface.java
index 4d5b78a9c..74897e29b 100644
--- a/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/impl/AbstractSharedRestInterface.java
+++ b/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/impl/AbstractSharedRestInterface.java
@@ -31,12 +31,15 @@ public abstract class AbstractSharedRestInterface {
     return error(entity, 400);
   }
 
+  protected <T> Response notFound(T entity) {
+    return error(entity, 404);
+  }
+
   protected <T> Response serverError(T entity) {
     return error(entity, 500);
   }
 
-  protected <T> Response error(T entity,
-                               Integer statusCode) {
+  protected <T> Response error(T entity, Integer statusCode) {
     return Response
         .status(statusCode)
         .entity(entity)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index eafa7179d..b71064f3c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.rest.impl;
 
-
 import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
 import org.apache.streampipes.commons.exceptions.NoMatchingJsonSchemaException;
 import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
@@ -76,14 +75,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Get all pipelines of the current user",
-      tags = {"Pipeline"},
-      responses = {
-          @ApiResponse(content = {
-              @Content(
+  @Operation(summary = "Get all pipelines of the current user", tags = {"Pipeline"}, responses = {
+      @ApiResponse(content = {
+          @Content(
                   mediaType = "application/json",
-                  array = @ArraySchema(schema = @Schema(implementation = Pipeline.class)))
-          })})
+                  array = @ArraySchema(schema = @Schema(implementation = Pipeline.class))
+          )})})
   @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_PRIVILEGE)
   @PostFilter("hasPermission(filterObject.pipelineId, 'READ')")
   public List<Pipeline> get() {
@@ -94,8 +91,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/{pipelineId}/status")
   @JacksonSerialized
-  @Operation(summary = "Get the pipeline status of a given pipeline",
-      tags = {"Pipeline"})
+  @Operation(summary = "Get the pipeline status of a given pipeline", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_PRIVILEGE)
   public Response getPipelineStatus(@PathParam("pipelineId") String pipelineId) {
     return ok(PipelineStatusManager.getPipelineStatus(pipelineId, 5));
@@ -105,8 +101,7 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Path("/{pipelineId}")
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Delete a pipeline with a given id",
-      tags = {"Pipeline"})
+  @Operation(summary = "Delete a pipeline with a given id", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_DELETE_PIPELINE_PRIVILEGE)
   public Response removeOwn(@PathParam("pipelineId") String pipelineId) {
     PipelineManager.deletePipeline(pipelineId);
@@ -117,19 +112,23 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Path("/{pipelineId}")
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Get a specific pipeline with the given id",
-      tags = {"Pipeline"})
+  @Operation(summary = "Get a specific pipeline with the given id", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_PRIVILEGE)
   public Response getElement(@PathParam("pipelineId") String pipelineId) {
-    return ok(PipelineManager.getPipeline(pipelineId));
+    Pipeline foundPipeline = PipelineManager.getPipeline(pipelineId);
+
+    if (foundPipeline == null) {
+      return notFound("Pipeline with " + pipelineId + " not found.");
+    } else {
+      return ok(PipelineManager.getPipeline(pipelineId));
+    }
   }
 
   @Path("/{pipelineId}/start")
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Start the pipeline with the given id",
-      tags = {"Pipeline"})
+  @Operation(summary = "Start the pipeline with the given id", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   public Response start(@PathParam("pipelineId") String pipelineId) {
     try {
@@ -146,28 +145,24 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Stop the pipeline with the given id",
-      tags = {"Pipeline"})
+  @Operation(summary = "Stop the pipeline with the given id", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   public Response stop(@PathParam("pipelineId") String pipelineId,
-                       @QueryParam("forceStop") @DefaultValue("false") boolean forceStop) {
+             @QueryParam("forceStop") @DefaultValue("false") boolean forceStop) {
     try {
       PipelineOperationStatus status = PipelineManager.stopPipeline(pipelineId, forceStop);
       return ok(status);
-    } catch
-    (Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
-      return constructErrorMessage(
-          new Notification(NotificationType.UNKNOWN_ERROR.title(), NotificationType.UNKNOWN_ERROR.description(),
-              e.getMessage()));
+      return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR.title(),
+          NotificationType.UNKNOWN_ERROR.description(), e.getMessage()));
     }
   }
 
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Store a new pipeline",
-      tags = {"Pipeline"})
+  @Operation(summary = "Store a new pipeline", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   public Response addPipeline(Pipeline pipeline) {
 
@@ -184,21 +179,17 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Hidden
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   @PostAuthorize("hasPermission(returnObject, 'READ')")
-  public PipelineElementRecommendationMessage recommend(Pipeline pipeline,
-                                                        @PathParam("recId") String baseRecElement) {
+  public PipelineElementRecommendationMessage recommend(Pipeline pipeline, @PathParam("recId") String baseRecElement) {
     try {
       return Operations.findRecommendedElements(pipeline, baseRecElement);
     } catch (JsonSyntaxException e) {
-      throw new WebApplicationException(badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
-          e.getMessage())));
+      throw new WebApplicationException(badRequest(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage())));
     } catch (NoSuitableSepasAvailableException e) {
-      throw new WebApplicationException(badRequest(new Notification(NotificationType.NO_SEPA_FOUND,
-          e.getMessage())));
+      throw new WebApplicationException(badRequest(new Notification(NotificationType.NO_SEPA_FOUND, e.getMessage())));
     } catch (Exception e) {
       e.printStackTrace();
       throw new WebApplicationException(
-          serverError(constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR,
-              e.getMessage()))));
+          serverError(constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage()))));
     }
   }
 
@@ -224,26 +215,20 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
     try {
       return ok(Operations.validatePipeline(pipeline));
     } catch (JsonSyntaxException e) {
-      return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
-          e.getMessage()));
+      return badRequest(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage()));
     } catch (NoMatchingSchemaException e) {
-      return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION,
-          e.getMessage()));
+      return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION, e.getMessage()));
     } catch (NoMatchingFormatException e) {
-      return badRequest(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION,
-          e.getMessage()));
+      return badRequest(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION, e.getMessage()));
     } catch (NoMatchingProtocolException e) {
-      return badRequest(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION,
-          e.getMessage()));
+      return badRequest(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION, e.getMessage()));
     } catch (RemoteServerNotAccessibleException | NoMatchingJsonSchemaException e) {
-      return serverError(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE
-          , e.getMessage()));
+      return serverError(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE, e.getMessage()));
     } catch (InvalidConnectionException e) {
       return badRequest(e.getErrorLog());
     } catch (Exception e) {
       e.printStackTrace();
-      return serverError(new Notification(NotificationType.UNKNOWN_ERROR,
-          e.getMessage()));
+      return serverError(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage()));
     }
   }
 
@@ -251,11 +236,9 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Path("/{pipelineId}")
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
-  @Operation(summary = "Update an existing pipeline",
-      tags = {"Pipeline"})
+  @Operation(summary = "Update an existing pipeline", tags = {"Pipeline"})
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
-  public Response overwritePipeline(@PathParam("pipelineId") String pipelineId,
-                                    Pipeline pipeline) {
+  public Response overwritePipeline(@PathParam("pipelineId") String pipelineId, Pipeline pipeline) {
     Pipeline storedPipeline = getPipelineStorage().getPipeline(pipelineId);
     if (!storedPipeline.isRunning()) {
       storedPipeline.setStreams(pipeline.getStreams());
@@ -275,14 +258,10 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/contains/{elementId}")
   @JacksonSerialized
-  @Operation(summary = "Returns all pipelines that contain the element with the elementId",
-      tags = {"Pipeline"},
-      responses = {
-          @ApiResponse(content = {
-              @Content(
-                  mediaType = "application/json",
-                  array = @ArraySchema(schema = @Schema(implementation = Pipeline.class)))
-          })})
+  @Operation(summary = "Returns all pipelines that contain the element with the elementId", tags = {
+      "Pipeline"}, responses = {@ApiResponse(content = {
+        @Content(mediaType = "application/json",
+          array = @ArraySchema(schema = @Schema(implementation = Pipeline.class)))})})
   @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_PRIVILEGE)
   @PostFilter("hasPermission(filterObject.pipelineId, 'READ')")
   public List<Pipeline> getPipelinesContainingElement(@PathParam("elementId") String elementId) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContextGenerator.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContextGenerator.java
index c3465b413..9b56d0d07 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContextGenerator.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContextGenerator.java
@@ -25,7 +25,7 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Optional;
 
 public class FunctionContextGenerator {
 
@@ -60,6 +60,7 @@ public class FunctionContextGenerator {
     return this.streamIds
         .stream()
         .map(streamId -> client.streams().get(streamId))
-        .collect(Collectors.toList());
+        .flatMap(Optional::stream)
+        .toList();
   }
 }