You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:23 UTC

[incubator-streampipes] 02/03: introduce state to processors

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8e6390da5fd4a2fcd459b39e66ce0b4e96a63e71
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:06:51 2021 +0100

    introduce state to processors
---
 .../api/InvocablePipelineElementResource.java      |  36 +++++
 .../SemanticEventProcessingAgentDeclarer.java      |   3 +-
 .../model/base/ConsumableStreamPipesEntity.java    |  17 +++
 .../model/base/InvocableStreamPipesEntity.java     |  15 +++
 .../model/graph/DataProcessorInvocation.java       |   2 +
 .../controller/api/InvocableEntityResource.java    |  25 ++++
 .../management/pe/IPipelineElementLifeCycle.java   |   4 +
 .../pe/PipelineElementLifeCycleState.java          |   2 +
 .../management/pe/PipelineElementManager.java      |  10 ++
 .../pe/handler/PipelineElementStateHandler.java    | 146 +++++++++++++++++++++
 .../node/controller/utils/HttpUtils.java           |  17 +++
 .../builder/AbstractProcessingElementBuilder.java  |   5 +
 .../apache/streampipes/vocabulary/StreamPipes.java |   1 +
 .../wrapper/spark/SparkDataProcessorDeclarer.java  |  10 ++
 .../standalone/StreamPipesDataProcessor.java       |  14 ++
 .../standalone/state/DefaultStateCollector.java    |  92 +++++++++++++
 .../wrapper/standalone/state/ObjectTypeTuple.java  |  31 +++--
 .../wrapper/standalone/state/StateObject.java      |  18 +--
 .../wrapper/declarer/EventProcessorDeclarer.java   |   9 ++
 .../state/serializer/JacksonStateSerializer.java   |  57 ++++++++
 .../wrapper/state/serializer/StateSerializer.java  |  15 +--
 ui/src/app/core-model/gen/streampipes-model.ts     |   2 +
 22 files changed, 502 insertions(+), 29 deletions(-)

diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index 0b55885..578de6d 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.container.api;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.container.declarer.InvocableDeclarer;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -150,6 +151,41 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
         return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
     }
 
+
+    @GET
+    @Path("{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response getState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+
+        InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+        if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+            String serializedState = ((SemanticEventProcessingAgentDeclarer) runningInstance).getState();
+            
+            return ok(new Response(elementId, true, serializedState));
+        }
+
+        return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+    }
+
+    @PUT
+    @Path("{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response setState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId, String payload) {
+
+        InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+        if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+            ((SemanticEventProcessingAgentDeclarer) runningInstance).setState(payload);
+            return ok(new Response(elementId, true, "State update successful."));
+        }
+
+        return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+    }
+
+
     protected abstract P getExtractor(I graph);
 
     protected abstract I createGroundingDebugInformation(I graph);
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
index 8ae0aff..9c43470 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
@@ -22,5 +22,6 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 
 public interface SemanticEventProcessingAgentDeclarer extends InvocableDeclarer<DataProcessorDescription, DataProcessorInvocation>{
-	
+	void setState(String state);
+    String getState();
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
index 6236014..39bf423 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
@@ -66,11 +66,26 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
   @RdfProperty(StreamPipes.ELEMENT_ENDPOINT_SERVICE_NAME)
   private String elementEndpointServiceName;
 
+  @OneToOne(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.IS_STATEFUL)
+  private boolean isStateful;
+
+
+  public boolean isStateful() {
+    return isStateful;
+  }
+
+  public void setStateful(boolean stateful) {
+    isStateful = stateful;
+  }
+
   public ConsumableStreamPipesEntity() {
     super();
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
     this.resourceRequirements = new ArrayList<>();
+    this.setStateful(false);
   }
 
   public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
@@ -78,6 +93,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
     this.resourceRequirements = new ArrayList<>();
+    this.setStateful(false);
   }
 
   public ConsumableStreamPipesEntity(ConsumableStreamPipesEntity other) {
@@ -95,6 +111,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     if (other.getResourceRequirements() != null) {
       this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
     }
+    this.isStateful = other.isStateful;
   }
 
   public List<SpDataStream> getSpDataStreams() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 08c25a8..de46346 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -108,12 +108,27 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
 
   private boolean uncompleted;
 
+  @OneToOne(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.IS_STATEFUL)
+  private boolean isStateful;
+
+
+  public boolean isStateful() {
+    return isStateful;
+  }
+
+  public void setStateful(boolean stateful) {
+    isStateful = stateful;
+  }
+
   public InvocableStreamPipesEntity() {
     super();
   }
 
   public InvocableStreamPipesEntity(InvocableStreamPipesEntity other) {
     super(other);
+    this.setStateful(other.isStateful());
     this.belongsTo = other.getBelongsTo();
     this.correspondingPipeline = other.getCorrespondingPipeline();
     this.inputStreams = new Cloner().streams(other.getInputStreams());
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 1e7a4d6..3adb822 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -87,6 +87,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.setOutputStreamRelays(sepa.getOutputStreamRelays());
     this.setEventRelayStrategy(sepa.getEventRelayStrategy());
     this.setResourceRequirements(sepa.getResourceRequirements());
+    this.setStateful(sepa.isStateful());
 
     //this.setUri(belongsTo +"/" +getElementId());
   }
@@ -103,6 +104,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.pathName = other.getPathName();
     this.eventRelayStrategy = other.getEventRelayStrategy();
     this.category = new Cloner().epaTypes(other.getCategory());
+    this.setStateful(other.isStateful());
     if (other.getResourceRequirements() != null) {
       this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index f950fa5..d1e4399 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -91,6 +91,31 @@ public class InvocableEntityResource extends AbstractResource {
         return ok(resp);
     }
 
+    @GET
+    @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response getState(@PathParam("identifier") String identifier,
+                                            @PathParam("elementId") String elementId,
+                                            @PathParam("runningInstanceId") String runningInstanceId) {
+        InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+        Response resp = PipelineElementManager.getInstance().getState(graph, runningInstanceId);
+
+        return ok(resp);
+    }
+
+    @PUT
+    @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response setState(@PathParam("identifier") String identifier,
+                                              @PathParam("elementId") String elementId,
+                                              @PathParam("runningInstanceId") String runningInstanceId,
+                                              String state) {
+        InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+        Response resp = PipelineElementManager.getInstance().setState(graph, runningInstanceId, state);
+
+        return ok(resp);
+    }
+
     // Live-Reconfiguration
     @POST
     @JacksonSerialized
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
index 3daa469..cbd3194 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
@@ -30,6 +30,10 @@ public interface IPipelineElementLifeCycle {
 
     Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
 
+    Response getState(InvocableStreamPipesEntity graph, String runningInstanceId);
+
+    Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state);
+
     Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
 
     Response offload(InvocableStreamPipesEntity graph);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
index bad4511..1f1edb5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
@@ -23,5 +23,7 @@ public enum PipelineElementLifeCycleState {
     DETACH,
     RECONFIGURE,
     OFFLOAD,
+    GETSTATE,
+    SETSTATE,
     DEREGISTER
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
index 7927814..0e87da4 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
@@ -80,6 +80,16 @@ public class PipelineElementManager implements IPipelineElementLifeCycle {
     }
 
     @Override
+    public Response getState(InvocableStreamPipesEntity graph, String runningInstanceId) {
+        return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.GETSTATE, runningInstanceId).handle();
+    }
+
+    @Override
+    public Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state) {
+        return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.SETSTATE, runningInstanceId, state).handle();
+    }
+
+    @Override
     public void deregister(){
         // TODO: unregister element from Consul
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
new file mode 100644
index 0000000..b661f56
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.pe.handler;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.node.controller.management.IHandler;
+import org.apache.streampipes.node.controller.management.pe.PipelineElementLifeCycleState;
+import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class PipelineElementStateHandler implements IHandler<Response> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInteractionHandler.class.getCanonicalName());
+    private static final String SLASH = "/";
+    private static final String STATE = "state";
+    private static final long RETRY_INTERVAL_MS = 5000;
+
+    private final InvocableStreamPipesEntity graph;
+    private final PipelineElementLifeCycleState type;
+    private final String runningInstanceId;
+    private final String state;
+
+
+    public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId) {
+        this(graph, type, runningInstanceId, "{}");
+    }
+
+    public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId, String state) {
+        this.graph = graph;
+        this.type = type;
+        this.runningInstanceId = runningInstanceId;
+        this.state = state;
+    }
+
+
+    @Override
+    public Response handle() {
+        switch(type) {
+            case GETSTATE:
+                return getState();
+            case SETSTATE:
+                return setState();
+            default:
+                throw new SpRuntimeException("Life cycle step not supported" + type);
+        }
+    }
+
+
+    private Response getState(){
+        Response response = new Response();
+        String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+        LOG.info("Trying to get state of pipeline element: {}", url);
+        boolean connected = false;
+        while (!connected) {
+
+            response = HttpUtils.get(url, Response.class);
+            connected = response.isSuccess();
+
+            response.setOptionalMessage(response.getOptionalMessage());
+
+            if (!connected) {
+                LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Failed to get State of pipeline element: " + url, e);
+                }
+            }
+        }
+        LOG.info("Successfully retrieved state from pipeline element {}", url);
+        return response;
+    }
+
+    private Response setState(){
+        Response response = new Response();
+        String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+        LOG.info("Trying to set state of pipeline element: {}", url);
+        boolean connected = false;
+        while (!connected) {
+
+
+            response = HttpUtils.putAndRespond(url, state);
+
+            connected = response.isSuccess();
+
+            if (!connected) {
+                LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Failed to set State of pipeline element: " + url, e);
+                }
+            }
+        }
+        LOG.info("Successfully retrieved state from pipeline element {}", url);
+        return response;
+    }
+
+
+    //TODO: Optimize compression and move compression to PipelineElement Container (Brotli or Zstandard)
+    //Preliminary compression logic copied from https://gist.github.com/yfnick/227e0c12957a329ad138
+    private static byte[] compress(String data) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
+        GZIPOutputStream gzip = new GZIPOutputStream(bos);
+        gzip.write(data.getBytes());
+        gzip.close();
+        byte[] compressed = bos.toByteArray();
+        bos.close();
+        return compressed;
+    }
+
+    private static String decompress(final byte[] compressed) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+        GZIPInputStream gis = new GZIPInputStream(bis);
+        byte[] bytes = IOUtils.toByteArray(gis);
+        return new String(bytes, "UTF-8");
+    }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
index f1e0536..378bf0b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -126,6 +126,23 @@ public class HttpUtils {
         }
     }
 
+    //TODO: Naming is provisional; rename
+    public static org.apache.streampipes.model.Response putAndRespond(String url, String body) {
+        org.apache.streampipes.model.Response response = new org.apache.streampipes.model.Response();
+        javax.ws.rs.core.Response.ok().build();
+        response.setSuccess(false);
+        try {
+            response = deserialize(Request.Put(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute(), org.apache.streampipes.model.Response.class);
+            return response;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during PUT request", e);
+            return response;
+        }
+    }
+
     public static String post(String url, String body) {
         try {
             return Request.Post(url)
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index e8d2f6b..c93ea54 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -272,6 +272,11 @@ public abstract class AbstractProcessingElementBuilder<BU extends
     return me();
   }
 
+  public BU containsState(){
+    this.elementDescription.setStateful(true);
+    return me();
+  }
+
 
   @Override
   public void prepareBuild() {
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 81fd4bd..d22e6d3 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -508,4 +508,5 @@ public class StreamPipes {
   public static final String HAS_CONTAINER_ENV_KEY = NS + "hasContainerEnvKey";
   public static final String HAS_CONTAINER_ENV_VALUE = NS + "hasContainerEnvValue";
     public static final String HAS_DEPLOYMENT_CONTAINER = NS + "hasDeploymentContainer";
+    public static final String IS_STATEFUL = NS + "isStateful";
 }
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
index 91fbf57..d07a3d4 100644
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
@@ -25,4 +25,14 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 
 public abstract class SparkDataProcessorDeclarer<B extends EventProcessorBindingParams>
     extends AbstractSparkDeclarer<DataProcessorDescription, DataProcessorInvocation, SparkDataProcessorRuntime> implements SemanticEventProcessingAgentDeclarer {
+    //If not overwritten elements are regarded as stateless
+    //TODO: Assess if this class is needed in its current form as it is not used in extensions atm
+    @Override
+    public void setState(String state) {
+    }
+
+    @Override
+    public String getState() {
+        return null;
+    }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde02..07ee0cb 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -21,15 +21,29 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.standalone.state.DefaultStateCollector;
 
 import java.util.function.Supplier;
 
 public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
 
+  protected DefaultStateCollector stateCollector = null;
+
   @Override
   public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
     Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
     return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
   }
 
+  @Override
+  public String getState() {
+    if(this.stateCollector != null)
+      return this.stateCollector.getState();
+    return null;
+  }
+
+  @Override
+  public void setState(String state) {
+    this.stateCollector.setState(state);
+  }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
new file mode 100644
index 0000000..7683f4e
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.state;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.streampipes.wrapper.state.serializer.JacksonStateSerializer;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultStateCollector {
+
+    private ArrayList<Field> fields;
+    private HashMap<String, Field> fieldsMap;
+    private Object obj;
+    private StateSerializer serializer;
+
+
+    public DefaultStateCollector(Object o){
+        this(o, new JacksonStateSerializer());
+    }
+
+    public DefaultStateCollector(Object o, StateSerializer serializer){
+        this.obj = o;
+        this.fields = new ArrayList<>(Arrays.asList(o.getClass().getDeclaredFields()));
+        this.serializer = serializer;
+        //Only keep marked fields as part of the State
+        for(Field f : o.getClass().getDeclaredFields()){
+            if(f.getAnnotation(StateObject.class) == null){
+                this.fields.remove(f);
+            }
+        }
+
+        this.fieldsMap = new HashMap<>();
+        //Make a map of all fields with their respective Names
+        for(Field f: fields){
+            f.setAccessible(true);
+            this.fieldsMap.put(f.getName(), f);
+        }
+    }
+
+    public void setState(String state){
+        Type t = new TypeReference<HashMap<String, ObjectTypeTuple>>(){}.getType();
+        String mapTypeSignature = serializer.getTypeSignature(t);
+        HashMap<String, ObjectTypeTuple> map = serializer.deserialize(state, mapTypeSignature);
+        for(Map.Entry<String, ObjectTypeTuple> entry : map.entrySet()){
+            try {
+                this.fieldsMap.get(entry.getKey()).set(this.obj, serializer.deserialize(
+                        entry.getValue().getObjectSerialization(), entry.getValue().getGenericTypeSignature()));
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public String getState()  {
+        Map<String, ObjectTypeTuple> list = new HashMap<>();
+        for(Field f : this.fields){
+            try {
+                if(f.get(this.obj) != null){
+                    ObjectTypeTuple objectTypeTuple = new ObjectTypeTuple(f.get(this.obj), this.serializer, f.getGenericType());
+                    list.put(f.getName(), objectTypeTuple);
+                }
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+        return serializer.serialize(list);
+    }
+
+}
\ No newline at end of file
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
similarity index 51%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
index 3daa469..343d738 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
@@ -15,25 +15,32 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.standalone.state;
 
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+import java.lang.reflect.Type;
 
-public interface IPipelineElementLifeCycle {
+public class ObjectTypeTuple {
 
-    void register(InvocableRegistration registration);
+    private String genericTypeSignature;
+    private String objectSerialization;
 
-    Response invoke(InvocableStreamPipesEntity graph);
+    //Default constructor needed for Jackson deserialization
+    public ObjectTypeTuple(){}
 
-    Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
+    public ObjectTypeTuple(Object obj, StateSerializer serializer, Type type){
+        this.genericTypeSignature = serializer.getTypeSignature(type);
+        this.objectSerialization = serializer.serialize(obj);
+    }
 
-    Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
+    public String getGenericTypeSignature(){
+        return this.genericTypeSignature;
+    }
+
+    public String getObjectSerialization(){
+        return this.objectSerialization;
+    }
 
-    Response offload(InvocableStreamPipesEntity graph);
 
-    void deregister();
 
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
similarity index 71%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
index bad4511..3c0554c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
 
-public enum PipelineElementLifeCycleState {
-    REGISTER,
-    INVOKE,
-    DETACH,
-    RECONFIGURE,
-    OFFLOAD,
-    DEREGISTER
+package org.apache.streampipes.wrapper.standalone.state;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface StateObject {
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
index fa8c5bf..9a54f54 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
@@ -44,4 +44,13 @@ public abstract class EventProcessorDeclarer<B extends EventProcessorBindingPara
 		return invokeEPRuntime(graph);
 	}
 
+	//If not overwritten elements are regarded as stateless
+	@Override
+	public void setState(String state) {
+	}
+
+	@Override
+	public String getState() {
+		return null;
+	}
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
new file mode 100644
index 0000000..88f55ac
--- /dev/null
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.state.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+
+import java.lang.reflect.Type;
+
+public class JacksonStateSerializer implements StateSerializer {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    @Override
+    public String serialize(Object o) {
+        try {
+            return objectMapper.writeValueAsString(o);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    @Override
+    public <T> T deserialize(String serializedObject, String typeSignature) {
+        TypeFactory typeFactory = objectMapper.getTypeFactory();
+        JavaType inputType = typeFactory.constructFromCanonical(typeSignature);
+        try {
+            return objectMapper.readValue(serializedObject, inputType);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    @Override
+    public String getTypeSignature(Type type) {
+        return objectMapper.constructType(type).toCanonical();
+    }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
similarity index 75%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
index bad4511..3807fcc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.state.serializer;
 
-public enum PipelineElementLifeCycleState {
-    REGISTER,
-    INVOKE,
-    DETACH,
-    RECONFIGURE,
-    OFFLOAD,
-    DEREGISTER
+import java.lang.reflect.Type;
+
+public interface StateSerializer {
+    String serialize(Object o);
+    <T> T deserialize(String serializedObject, String typeSignature);
+    String getTypeSignature(Type type);
 }
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f0e9657..9a7f187 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -1083,6 +1083,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
     preemption: boolean;
     priorityScore: number;
     resourceRequirements: NodeResourceRequirementUnion[];
+    stateful: boolean;
     staticProperties: StaticPropertyUnion[];
     statusInfoSettings: ElementStatusInfoSettings;
     streamRequirements: SpDataStreamUnion[];
@@ -1115,6 +1116,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
         instance.preemption = data.preemption;
         instance.configured = data.configured;
         instance.uncompleted = data.uncompleted;
+        instance.stateful = data.stateful;
         return instance;
     }
 }