You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:21 UTC

[incubator-streampipes] branch edge-extensions updated (d03e36a -> d304587)

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from d03e36a  latest changes to performance evaluation
     new 50354ab  improve relay buffer performance
     new 8e6390d  introduce state to processors
     new d304587  preliminary stateful migration implementation

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/InvocablePipelineElementResource.java      | 36 ++++++++
 .../SemanticEventProcessingAgentDeclarer.java      |  3 +-
 .../model/base/ConsumableStreamPipesEntity.java    | 17 ++++
 .../model/base/InvocableStreamPipesEntity.java     | 15 ++++
 .../model/graph/DataProcessorInvocation.java       |  2 +
 .../controller/api/InvocableEntityResource.java    | 25 ++++++
 .../management/pe/IPipelineElementLifeCycle.java   |  4 +
 .../pe/PipelineElementLifeCycleState.java          |  2 +
 .../management/pe/PipelineElementManager.java      | 10 +++
 ...ndler.java => PipelineElementStateHandler.java} | 80 ++++++++++++-----
 .../relay/bridges/MultiBrokerBridge.java           | 63 +++++++-------
 .../node/controller/utils/HttpUtils.java           | 17 ++++
 .../execution/http/StateEndpointUrlGenerator.java  | 75 ++++++++++++++++
 .../manager/execution/http/StateSubmitter.java     | 99 ++++++++++++++++++++++
 .../pipeline/AbstractPipelineExecutor.java         | 10 +++
 .../pipeline/PipelineMigrationExecutor.java        | 82 ++++++++++++++++++
 .../builder/AbstractProcessingElementBuilder.java  |  5 ++
 .../apache/streampipes/vocabulary/StreamPipes.java |  1 +
 .../wrapper/spark/SparkDataProcessorDeclarer.java  | 10 +++
 .../standalone/StreamPipesDataProcessor.java       | 14 +++
 .../standalone/state/DefaultStateCollector.java    | 92 ++++++++++++++++++++
 .../wrapper/standalone/state/ObjectTypeTuple.java  | 37 ++++----
 .../wrapper/standalone/state/StateObject.java      |  7 +-
 .../wrapper/declarer/EventProcessorDeclarer.java   |  9 ++
 .../state/serializer/JacksonStateSerializer.java   | 57 +++++++++++++
 .../wrapper/state/serializer/StateSerializer.java  |  9 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  2 +
 27 files changed, 709 insertions(+), 74 deletions(-)
 copy streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/{PipelineElementInteractionHandler.java => PipelineElementStateHandler.java} (52%)
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
 create mode 100644 streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
 copy streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/serializer/GsonJerseyProvider.java => streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java (53%)
 copy streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/annotation/NoAuthenticationRequired.java => streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java (88%)
 create mode 100644 streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java => streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java (75%)

[incubator-streampipes] 01/03: improve relay buffer performance

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 50354ab8664470a115ac7897743164b5c02b5890
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 19:56:38 2021 +0100

    improve relay buffer performance
---
 .../relay/bridges/MultiBrokerBridge.java           | 63 +++++++++++-----------
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 9db1d75..6591759 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -30,9 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.function.Supplier;
 
@@ -46,13 +44,12 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
     private final EventConsumer<T1> consumer;
     private final EventProducer<T2> producer;
     private final EventRelayStrategy eventRelayStrategy;
-    private final Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
-    //private CircularFifoBuffer buffer = new CircularFifoBuffer(NodeConfiguration.getEventRelayBufferSize());
+    private Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
 
-    private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
     private final Tuple3<String, Integer, String> relayInfo;
-    private boolean isBuffering = false;
-    private boolean targetAlive = true;
+    private volatile boolean isBuffering = false;
+    private volatile boolean targetAlive = true;
+    private volatile boolean isEmptyingBuffer = false;
 
     public MultiBrokerBridge(TransportProtocol sourceProtocol, TransportProtocol targetProtocol,
                              String eventRelayStrategy, Supplier<EventConsumer<T1>> consumerSupplier,
@@ -98,20 +95,20 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
             startAliveThread();
         }
         // check if target broker can be reached
-        if (targetAlive) {
+        if (targetAlive && !isEmptyingBuffer) {
 
             if(!eventBuffer.isEmpty()){
-                publishBufferedEvents(eventBuffer);
-                publishEvent(event);
-            }else{
+                bufferEvent(event);
+                publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+                eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
+            } else {
                 publishEvent(event);
             }
 
         } else {
-            //
             if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
                 // add event to buffer
-                if(!isBuffering) {
+                if(!isBuffering && !isEmptyingBuffer) {
                     LOG.info("Connection issue to broker={}:{}. Buffer events for topic={}", relayInfo.a, relayInfo.b
                             , relayInfo.c);
                     isBuffering = true;
@@ -134,7 +131,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         metrics.increaseNumRelayedEvents();
     }
 
-    private synchronized void bufferEvent(byte[] event){
+    private void bufferEvent(byte[] event){
         if (!eventBuffer.offer(event)){
             eventBuffer.poll();
             eventBuffer.offer(event);
@@ -147,23 +144,25 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                 relayInfo.c);
     }
 
-    private synchronized void publishBufferedEvents(Queue<byte[]> eventBuffer){
-        LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
-                        "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
-                relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
+    private void publishBufferedEvents(Queue<byte[]> buffer, long numDroppedEvents){
+        isEmptyingBuffer = true;
+        LOG.info("Resent buffered events for topic={} (buffer_size={}, num_dropped_events={})", relayInfo.c,
+                buffer.size(), numDroppedEvents);
 
-        // add current event from callback
-        eventBuffer.forEach(e -> {
-            try{
-                producer.publish(e);
-                metrics.increaseNumRelayedEvents();
-            }  catch (Exception ex) {
-                LOG.error(ex.toString());
-            }
+        Thread th = new Thread(() -> {
+            buffer.forEach(e -> {
+                try{
+                    publishEvent(e);
+                }  catch (Exception ex) {
+                    LOG.error(ex.toString());
+                }
+            });
+            metrics.clearNumDroppedEvents();
+            isBuffering = false;
+
+            isEmptyingBuffer = false;
         });
-        eventBuffer.clear();
-        metrics.clearNumDroppedEvents();
-        isBuffering = false;
+        th.start();
     }
 
 
@@ -231,7 +230,9 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                     if(isTargetBrokerAlive())
                         isAlive = true;
                 }
-                publishBufferedEvents(eventBuffer);
+                LOG.info("Re-established connection to broker={}:{}", relayInfo.a, relayInfo.b);
+                publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+                eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
                 targetAlive = true;
             }
         };

[incubator-streampipes] 02/03: introduce state to processors

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8e6390da5fd4a2fcd459b39e66ce0b4e96a63e71
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:06:51 2021 +0100

    introduce state to processors
---
 .../api/InvocablePipelineElementResource.java      |  36 +++++
 .../SemanticEventProcessingAgentDeclarer.java      |   3 +-
 .../model/base/ConsumableStreamPipesEntity.java    |  17 +++
 .../model/base/InvocableStreamPipesEntity.java     |  15 +++
 .../model/graph/DataProcessorInvocation.java       |   2 +
 .../controller/api/InvocableEntityResource.java    |  25 ++++
 .../management/pe/IPipelineElementLifeCycle.java   |   4 +
 .../pe/PipelineElementLifeCycleState.java          |   2 +
 .../management/pe/PipelineElementManager.java      |  10 ++
 .../pe/handler/PipelineElementStateHandler.java    | 146 +++++++++++++++++++++
 .../node/controller/utils/HttpUtils.java           |  17 +++
 .../builder/AbstractProcessingElementBuilder.java  |   5 +
 .../apache/streampipes/vocabulary/StreamPipes.java |   1 +
 .../wrapper/spark/SparkDataProcessorDeclarer.java  |  10 ++
 .../standalone/StreamPipesDataProcessor.java       |  14 ++
 .../standalone/state/DefaultStateCollector.java    |  92 +++++++++++++
 .../wrapper/standalone/state/ObjectTypeTuple.java  |  31 +++--
 .../wrapper/standalone/state/StateObject.java      |  18 +--
 .../wrapper/declarer/EventProcessorDeclarer.java   |   9 ++
 .../state/serializer/JacksonStateSerializer.java   |  57 ++++++++
 .../wrapper/state/serializer/StateSerializer.java  |  15 +--
 ui/src/app/core-model/gen/streampipes-model.ts     |   2 +
 22 files changed, 502 insertions(+), 29 deletions(-)

diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index 0b55885..578de6d 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.container.api;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.declarer.Declarer;
 import org.apache.streampipes.container.declarer.InvocableDeclarer;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
 import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -150,6 +151,41 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
         return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
     }
 
+
+    @GET
+    @Path("{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response getState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+
+        InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+        if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+            String serializedState = ((SemanticEventProcessingAgentDeclarer) runningInstance).getState();
+            
+            return ok(new Response(elementId, true, serializedState));
+        }
+
+        return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+    }
+
+    @PUT
+    @Path("{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response setState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId, String payload) {
+
+        InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+        if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+            ((SemanticEventProcessingAgentDeclarer) runningInstance).setState(payload);
+            return ok(new Response(elementId, true, "State update successful."));
+        }
+
+        return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+    }
+
+
     protected abstract P getExtractor(I graph);
 
     protected abstract I createGroundingDebugInformation(I graph);
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
index 8ae0aff..9c43470 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
@@ -22,5 +22,6 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 
 public interface SemanticEventProcessingAgentDeclarer extends InvocableDeclarer<DataProcessorDescription, DataProcessorInvocation>{
-	
+	void setState(String state);
+    String getState();
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
index 6236014..39bf423 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
@@ -66,11 +66,26 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
   @RdfProperty(StreamPipes.ELEMENT_ENDPOINT_SERVICE_NAME)
   private String elementEndpointServiceName;
 
+  @OneToOne(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.IS_STATEFUL)
+  private boolean isStateful;
+
+
+  public boolean isStateful() {
+    return isStateful;
+  }
+
+  public void setStateful(boolean stateful) {
+    isStateful = stateful;
+  }
+
   public ConsumableStreamPipesEntity() {
     super();
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
     this.resourceRequirements = new ArrayList<>();
+    this.setStateful(false);
   }
 
   public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
@@ -78,6 +93,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
     this.resourceRequirements = new ArrayList<>();
+    this.setStateful(false);
   }
 
   public ConsumableStreamPipesEntity(ConsumableStreamPipesEntity other) {
@@ -95,6 +111,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     if (other.getResourceRequirements() != null) {
       this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
     }
+    this.isStateful = other.isStateful;
   }
 
   public List<SpDataStream> getSpDataStreams() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 08c25a8..de46346 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -108,12 +108,27 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
 
   private boolean uncompleted;
 
+  @OneToOne(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.IS_STATEFUL)
+  private boolean isStateful;
+
+
+  public boolean isStateful() {
+    return isStateful;
+  }
+
+  public void setStateful(boolean stateful) {
+    isStateful = stateful;
+  }
+
   public InvocableStreamPipesEntity() {
     super();
   }
 
   public InvocableStreamPipesEntity(InvocableStreamPipesEntity other) {
     super(other);
+    this.setStateful(other.isStateful());
     this.belongsTo = other.getBelongsTo();
     this.correspondingPipeline = other.getCorrespondingPipeline();
     this.inputStreams = new Cloner().streams(other.getInputStreams());
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 1e7a4d6..3adb822 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -87,6 +87,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.setOutputStreamRelays(sepa.getOutputStreamRelays());
     this.setEventRelayStrategy(sepa.getEventRelayStrategy());
     this.setResourceRequirements(sepa.getResourceRequirements());
+    this.setStateful(sepa.isStateful());
 
     //this.setUri(belongsTo +"/" +getElementId());
   }
@@ -103,6 +104,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.pathName = other.getPathName();
     this.eventRelayStrategy = other.getEventRelayStrategy();
     this.category = new Cloner().epaTypes(other.getCategory());
+    this.setStateful(other.isStateful());
     if (other.getResourceRequirements() != null) {
       this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index f950fa5..d1e4399 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -91,6 +91,31 @@ public class InvocableEntityResource extends AbstractResource {
         return ok(resp);
     }
 
+    @GET
+    @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response getState(@PathParam("identifier") String identifier,
+                                            @PathParam("elementId") String elementId,
+                                            @PathParam("runningInstanceId") String runningInstanceId) {
+        InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+        Response resp = PipelineElementManager.getInstance().getState(graph, runningInstanceId);
+
+        return ok(resp);
+    }
+
+    @PUT
+    @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response setState(@PathParam("identifier") String identifier,
+                                              @PathParam("elementId") String elementId,
+                                              @PathParam("runningInstanceId") String runningInstanceId,
+                                              String state) {
+        InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+        Response resp = PipelineElementManager.getInstance().setState(graph, runningInstanceId, state);
+
+        return ok(resp);
+    }
+
     // Live-Reconfiguration
     @POST
     @JacksonSerialized
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
index 3daa469..cbd3194 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
@@ -30,6 +30,10 @@ public interface IPipelineElementLifeCycle {
 
     Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
 
+    Response getState(InvocableStreamPipesEntity graph, String runningInstanceId);
+
+    Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state);
+
     Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
 
     Response offload(InvocableStreamPipesEntity graph);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
index bad4511..1f1edb5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
@@ -23,5 +23,7 @@ public enum PipelineElementLifeCycleState {
     DETACH,
     RECONFIGURE,
     OFFLOAD,
+    GETSTATE,
+    SETSTATE,
     DEREGISTER
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
index 7927814..0e87da4 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
@@ -80,6 +80,16 @@ public class PipelineElementManager implements IPipelineElementLifeCycle {
     }
 
     @Override
+    public Response getState(InvocableStreamPipesEntity graph, String runningInstanceId) {
+        return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.GETSTATE, runningInstanceId).handle();
+    }
+
+    @Override
+    public Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state) {
+        return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.SETSTATE, runningInstanceId, state).handle();
+    }
+
+    @Override
     public void deregister(){
         // TODO: unregister element from Consul
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
new file mode 100644
index 0000000..b661f56
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.pe.handler;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.node.controller.management.IHandler;
+import org.apache.streampipes.node.controller.management.pe.PipelineElementLifeCycleState;
+import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class PipelineElementStateHandler implements IHandler<Response> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInteractionHandler.class.getCanonicalName());
+    private static final String SLASH = "/";
+    private static final String STATE = "state";
+    private static final long RETRY_INTERVAL_MS = 5000;
+
+    private final InvocableStreamPipesEntity graph;
+    private final PipelineElementLifeCycleState type;
+    private final String runningInstanceId;
+    private final String state;
+
+
+    public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId) {
+        this(graph, type, runningInstanceId, "{}");
+    }
+
+    public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId, String state) {
+        this.graph = graph;
+        this.type = type;
+        this.runningInstanceId = runningInstanceId;
+        this.state = state;
+    }
+
+
+    @Override
+    public Response handle() {
+        switch(type) {
+            case GETSTATE:
+                return getState();
+            case SETSTATE:
+                return setState();
+            default:
+                throw new SpRuntimeException("Life cycle step not supported" + type);
+        }
+    }
+
+
+    private Response getState(){
+        Response response = new Response();
+        String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+        LOG.info("Trying to get state of pipeline element: {}", url);
+        boolean connected = false;
+        while (!connected) {
+
+            response = HttpUtils.get(url, Response.class);
+            connected = response.isSuccess();
+
+            response.setOptionalMessage(response.getOptionalMessage());
+
+            if (!connected) {
+                LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Failed to get State of pipeline element: " + url, e);
+                }
+            }
+        }
+        LOG.info("Successfully retrieved state from pipeline element {}", url);
+        return response;
+    }
+
+    private Response setState(){
+        Response response = new Response();
+        String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+        LOG.info("Trying to set state of pipeline element: {}", url);
+        boolean connected = false;
+        while (!connected) {
+
+
+            response = HttpUtils.putAndRespond(url, state);
+
+            connected = response.isSuccess();
+
+            if (!connected) {
+                LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Failed to set State of pipeline element: " + url, e);
+                }
+            }
+        }
+        LOG.info("Successfully retrieved state from pipeline element {}", url);
+        return response;
+    }
+
+
+    //TODO: Optimize compression and move compression to PipelineElement Container (Brotli or Zstandard)
+    //Preliminary compression logic copied from https://gist.github.com/yfnick/227e0c12957a329ad138
+    private static byte[] compress(String data) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
+        GZIPOutputStream gzip = new GZIPOutputStream(bos);
+        gzip.write(data.getBytes());
+        gzip.close();
+        byte[] compressed = bos.toByteArray();
+        bos.close();
+        return compressed;
+    }
+
+    private static String decompress(final byte[] compressed) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+        GZIPInputStream gis = new GZIPInputStream(bis);
+        byte[] bytes = IOUtils.toByteArray(gis);
+        return new String(bytes, "UTF-8");
+    }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
index f1e0536..378bf0b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -126,6 +126,23 @@ public class HttpUtils {
         }
     }
 
+    //TODO: Naming is provisional; rename
+    public static org.apache.streampipes.model.Response putAndRespond(String url, String body) {
+        org.apache.streampipes.model.Response response = new org.apache.streampipes.model.Response();
+        javax.ws.rs.core.Response.ok().build();
+        response.setSuccess(false);
+        try {
+            response = deserialize(Request.Put(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute(), org.apache.streampipes.model.Response.class);
+            return response;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during PUT request", e);
+            return response;
+        }
+    }
+
     public static String post(String url, String body) {
         try {
             return Request.Post(url)
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index e8d2f6b..c93ea54 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -272,6 +272,11 @@ public abstract class AbstractProcessingElementBuilder<BU extends
     return me();
   }
 
+  public BU containsState(){
+    this.elementDescription.setStateful(true);
+    return me();
+  }
+
 
   @Override
   public void prepareBuild() {
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 81fd4bd..d22e6d3 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -508,4 +508,5 @@ public class StreamPipes {
   public static final String HAS_CONTAINER_ENV_KEY = NS + "hasContainerEnvKey";
   public static final String HAS_CONTAINER_ENV_VALUE = NS + "hasContainerEnvValue";
     public static final String HAS_DEPLOYMENT_CONTAINER = NS + "hasDeploymentContainer";
+    public static final String IS_STATEFUL = NS + "isStateful";
 }
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
index 91fbf57..d07a3d4 100644
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
@@ -25,4 +25,14 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 
 public abstract class SparkDataProcessorDeclarer<B extends EventProcessorBindingParams>
     extends AbstractSparkDeclarer<DataProcessorDescription, DataProcessorInvocation, SparkDataProcessorRuntime> implements SemanticEventProcessingAgentDeclarer {
+    //If not overwritten elements are regarded as stateless
+    //TODO: Assess if this class is needed in its current form as it is not used in extensions atm
+    @Override
+    public void setState(String state) {
+    }
+
+    @Override
+    public String getState() {
+        return null;
+    }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde02..07ee0cb 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -21,15 +21,29 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.standalone.state.DefaultStateCollector;
 
 import java.util.function.Supplier;
 
 public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
 
+  protected DefaultStateCollector stateCollector = null;
+
   @Override
   public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
     Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
     return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
   }
 
+  @Override
+  public String getState() {
+    if(this.stateCollector != null)
+      return this.stateCollector.getState();
+    return null;
+  }
+
+  @Override
+  public void setState(String state) {
+    this.stateCollector.setState(state);
+  }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
new file mode 100644
index 0000000..7683f4e
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.state;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.streampipes.wrapper.state.serializer.JacksonStateSerializer;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultStateCollector {
+
+    private ArrayList<Field> fields;
+    private HashMap<String, Field> fieldsMap;
+    private Object obj;
+    private StateSerializer serializer;
+
+
+    public DefaultStateCollector(Object o){
+        this(o, new JacksonStateSerializer());
+    }
+
+    public DefaultStateCollector(Object o, StateSerializer serializer){
+        this.obj = o;
+        this.fields = new ArrayList<>(Arrays.asList(o.getClass().getDeclaredFields()));
+        this.serializer = serializer;
+        //Only keep marked fields as part of the State
+        for(Field f : o.getClass().getDeclaredFields()){
+            if(f.getAnnotation(StateObject.class) == null){
+                this.fields.remove(f);
+            }
+        }
+
+        this.fieldsMap = new HashMap<>();
+        //Make a map of all fields with their respective Names
+        for(Field f: fields){
+            f.setAccessible(true);
+            this.fieldsMap.put(f.getName(), f);
+        }
+    }
+
+    public void setState(String state){
+        Type t = new TypeReference<HashMap<String, ObjectTypeTuple>>(){}.getType();
+        String mapTypeSignature = serializer.getTypeSignature(t);
+        HashMap<String, ObjectTypeTuple> map = serializer.deserialize(state, mapTypeSignature);
+        for(Map.Entry<String, ObjectTypeTuple> entry : map.entrySet()){
+            try {
+                this.fieldsMap.get(entry.getKey()).set(this.obj, serializer.deserialize(
+                        entry.getValue().getObjectSerialization(), entry.getValue().getGenericTypeSignature()));
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public String getState()  {
+        Map<String, ObjectTypeTuple> list = new HashMap<>();
+        for(Field f : this.fields){
+            try {
+                if(f.get(this.obj) != null){
+                    ObjectTypeTuple objectTypeTuple = new ObjectTypeTuple(f.get(this.obj), this.serializer, f.getGenericType());
+                    list.put(f.getName(), objectTypeTuple);
+                }
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+        return serializer.serialize(list);
+    }
+
+}
\ No newline at end of file
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
similarity index 51%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
index 3daa469..343d738 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
@@ -15,25 +15,32 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.standalone.state;
 
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+import java.lang.reflect.Type;
 
-public interface IPipelineElementLifeCycle {
+public class ObjectTypeTuple {
 
-    void register(InvocableRegistration registration);
+    private String genericTypeSignature;
+    private String objectSerialization;
 
-    Response invoke(InvocableStreamPipesEntity graph);
+    //Default constructor needed for Jackson deserialization
+    public ObjectTypeTuple(){}
 
-    Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
+    public ObjectTypeTuple(Object obj, StateSerializer serializer, Type type){
+        this.genericTypeSignature = serializer.getTypeSignature(type);
+        this.objectSerialization = serializer.serialize(obj);
+    }
 
-    Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
+    public String getGenericTypeSignature(){
+        return this.genericTypeSignature;
+    }
+
+    public String getObjectSerialization(){
+        return this.objectSerialization;
+    }
 
-    Response offload(InvocableStreamPipesEntity graph);
 
-    void deregister();
 
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
similarity index 71%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
index bad4511..3c0554c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
 
-public enum PipelineElementLifeCycleState {
-    REGISTER,
-    INVOKE,
-    DETACH,
-    RECONFIGURE,
-    OFFLOAD,
-    DEREGISTER
+package org.apache.streampipes.wrapper.standalone.state;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface StateObject {
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
index fa8c5bf..9a54f54 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
@@ -44,4 +44,13 @@ public abstract class EventProcessorDeclarer<B extends EventProcessorBindingPara
 		return invokeEPRuntime(graph);
 	}
 
+	//If not overwritten elements are regarded as stateless
+	@Override
+	public void setState(String state) {
+	}
+
+	@Override
+	public String getState() {
+		return null;
+	}
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
new file mode 100644
index 0000000..88f55ac
--- /dev/null
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.state.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+
+import java.lang.reflect.Type;
+
+public class JacksonStateSerializer implements StateSerializer {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    @Override
+    public String serialize(Object o) {
+        try {
+            return objectMapper.writeValueAsString(o);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    @Override
+    public <T> T deserialize(String serializedObject, String typeSignature) {
+        TypeFactory typeFactory = objectMapper.getTypeFactory();
+        JavaType inputType = typeFactory.constructFromCanonical(typeSignature);
+        try {
+            return objectMapper.readValue(serializedObject, inputType);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    @Override
+    public String getTypeSignature(Type type) {
+        return objectMapper.constructType(type).toCanonical();
+    }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
similarity index 75%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
index bad4511..3807fcc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.state.serializer;
 
-public enum PipelineElementLifeCycleState {
-    REGISTER,
-    INVOKE,
-    DETACH,
-    RECONFIGURE,
-    OFFLOAD,
-    DEREGISTER
+import java.lang.reflect.Type;
+
+public interface StateSerializer {
+    String serialize(Object o);
+    <T> T deserialize(String serializedObject, String typeSignature);
+    String getTypeSignature(Type type);
 }
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f0e9657..9a7f187 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -1083,6 +1083,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
     preemption: boolean;
     priorityScore: number;
     resourceRequirements: NodeResourceRequirementUnion[];
+    stateful: boolean;
     staticProperties: StaticPropertyUnion[];
     statusInfoSettings: ElementStatusInfoSettings;
     streamRequirements: SpDataStreamUnion[];
@@ -1115,6 +1116,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
         instance.preemption = data.preemption;
         instance.configured = data.configured;
         instance.uncompleted = data.uncompleted;
+        instance.stateful = data.stateful;
         return instance;
     }
 }

[incubator-streampipes] 03/03: preliminary stateful migration implementation

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d3045870f02e8d551962ca24a541d04227a35238
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:09:22 2021 +0100

    preliminary stateful migration implementation
---
 .../execution/http/StateEndpointUrlGenerator.java  | 75 ++++++++++++++++
 .../manager/execution/http/StateSubmitter.java     | 99 ++++++++++++++++++++++
 .../pipeline/AbstractPipelineExecutor.java         | 10 +++
 .../pipeline/PipelineMigrationExecutor.java        | 82 ++++++++++++++++++
 4 files changed, 266 insertions(+)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
new file mode 100644
index 0000000..2df02dc
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.config.consul.ConsulSpConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class StateEndpointUrlGenerator{
+    protected static final String HTTP_PROTOCOL = "http://";
+    protected static final String COLON = ":";
+    protected static final String SLASH = "/";
+
+    private static final String ELEMENT_ROUTE = "api/v2/node/element";
+    private static final String STATE_ROUTE = "/state";
+    private static final String DATA_PROCESSOR_PREFIX = "sepa";
+
+    private final InvocableStreamPipesEntity entity;
+
+    public StateEndpointUrlGenerator(InvocableStreamPipesEntity entity){
+        this.entity = entity;
+    }
+
+    public String generateStateEndpoint(){
+        modifyInvocableElement();
+        return HTTP_PROTOCOL + entity.getDeploymentTargetNodeHostname() + COLON + entity.getDeploymentTargetNodePort()
+                + SLASH
+                + ELEMENT_ROUTE
+                + SLASH
+                + DATA_PROCESSOR_PREFIX
+                + SLASH
+                + entity.getAppId()
+                + SLASH
+                + entity.getDeploymentRunningInstanceId()
+                + STATE_ROUTE;
+    }
+
+    private void modifyInvocableElement() {
+        // Necessary because secondary pipeline element description is not stored in backend
+        // It uses information from primary pipeline element. Node controller will locally forward
+        // request accordingly, thus fields must be correct.
+        String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+                + entity.getElementEndpointServiceName()
+                + SLASH
+                + ConsulSpConfig.BASE_PREFIX
+                + SLASH
+                + ConsulSpConfig.SECONDARY_NODE_KEY
+                + SLASH
+                + entity.getDeploymentTargetNodeId()
+                + SLASH;
+
+        String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+        int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+        entity.setElementEndpointHostname(host);
+        entity.setElementEndpointPort(port);
+        entity.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + DATA_PROCESSOR_PREFIX + SLASH + entity.getAppId());
+        entity.setElementId(entity.getBelongsTo() + SLASH + entity.getDeploymentRunningInstanceId());
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
new file mode 100644
index 0000000..dc3b7e5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class StateSubmitter {
+    protected final static Logger LOG = LoggerFactory.getLogger(StateSubmitter.class);
+
+    private static final Integer CONNECT_TIMEOUT = 10000;
+
+    private final String pipelineId;
+    private final String pipelineName;
+    private final InvocableStreamPipesEntity entity;
+    private final String elementState;
+
+    public StateSubmitter( String pipelineId, String pipelineName, InvocableStreamPipesEntity entity,
+                                     String elementState) {
+        this.pipelineId = pipelineId;
+        this.pipelineName = pipelineName;
+        this.elementState = elementState;
+        this.entity = entity;
+    }
+
+    public PipelineElementStatus setElementState(){
+        String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+        LOG.info("Setting state of pipeline element: " + endpoint);
+
+        try {
+            Response httpResp = Request
+                    .Put(endpoint)
+                    .bodyString(elementState, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return handleResponse(endpoint, httpResp);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return new PipelineElementStatus(endpoint, entity.getName(),
+                    false, e.getMessage());
+        }
+    }
+
+    public PipelineElementStatus getElementState(){
+        String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+        LOG.info("Getting state of pipeline element: " + endpoint);
+
+        try {
+            Response httpResp = Request
+                    .Get(endpoint)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return handleResponse(endpoint, httpResp);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return new PipelineElementStatus(endpoint, entity.getName(),
+                    false, e.getMessage());
+        }
+    }
+
+
+    private PipelineElementStatus handleResponse(String endpoint, Response httpResp) throws JsonSyntaxException,
+            IOException {
+        String resp = httpResp.returnContent().asString();
+        org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
+                .getObjectMapper()
+                .readValue(resp, org.apache.streampipes.model.Response.class);
+
+        return new PipelineElementStatus(endpoint,
+                entity.getName(),
+                streamPipesResp.isSuccess(),
+                streamPipesResp.getOptionalMessage());
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
index 2853a14..01c4f2c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 
+import org.apache.streampipes.manager.execution.http.StateSubmitter;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
@@ -35,6 +36,7 @@ import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.storage.api.INodeDataStreamRelay;
@@ -130,6 +132,14 @@ public abstract class AbstractPipelineExecutor {
                 relays).detachRelaysOnMigration();
     }
 
+    protected PipelineElementStatus getState(InvocableStreamPipesEntity graph){
+        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, null).getElementState();
+    }
+
+    protected PipelineElementStatus setState(InvocableStreamPipesEntity graph, String state){
+        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, state).setElementState();
+    }
+
     protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
                                                           InvocableStreamPipesEntity target){
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index e1f40e4..7115d40 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -82,6 +82,12 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
     }
 
     public PipelineOperationStatus migratePipelineElement() {
+        if(this.migrationEntity.getTargetElement().isStateful())
+            return migrateStatefulPipelineElement();
+        return migrateStatelessPipelineElement();
+    }
+
+    public PipelineOperationStatus migrateStatelessPipelineElement() {
 
         PipelineOperationStatus status = initPipelineOperationStatus();
 
@@ -155,6 +161,82 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
         return status;
     }
 
+    public PipelineOperationStatus migrateStatefulPipelineElement() {
+        //TODO: Assess how this 'dirty' implementation/procedure can be improved
+
+        PipelineOperationStatus status = initPipelineOperationStatus();
+
+        // 1. start new element
+        // 2. stop relay to origin element
+        // 3. start relay to new element
+        // 4. stop origin element
+        // 5. stop origin relay
+
+        prepareMigration();
+
+        //Try to get state of origin element (success not checked)
+        PipelineElementStatus statusGettingState = getState(this.migrationEntity.getSourceElement());
+        if(!statusGettingState.isSuccess()){
+            //status.addPipelineElementStatus(statusGettingState);
+            status.setSuccess(false);
+            return status;
+        }
+        String currentState = statusGettingState.getOptionalMessage();
+        // Start target pipeline elements and relays on new target node
+        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+        if(!statusStartTarget.isSuccess()){
+            //Target PE could not be started; nothing to roll back
+            return status;
+        }
+
+        //Set state of the newly invoked Pipeline Element (success not checked)
+         PipelineElementStatus statusSettingState = setState(this.migrationEntity.getTargetElement(), currentState);
+
+        if(!statusSettingState.isSuccess()){
+            //status.addPipelineElementStatus(statusSettingState);
+            status.setSuccess(false);
+            rollbackToPreMigrationStepOne(new PipelineOperationStatus(), status);
+            return status;
+        }
+
+        // Stop relays from origin predecessor
+        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+        if(!statusStopRelays.isSuccess()){
+            rollbackToPreMigrationStepOne(statusStopRelays, status);
+            return status;
+        }
+
+        // Start relays to target after migration
+        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+        if(!statusStartRelays.isSuccess()){
+            rollbackToPreMigrationStepTwo(statusStartRelays, status);
+            return status;
+        }
+
+        //Stop origin and associated relay
+        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+        if(!statusStopOrigin.isSuccess()){
+            rollbackToPreMigrationStepThree(status);
+            return status;
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(pipeline.getActions());
+        graphs.addAll(pipeline.getSepas());
+
+        List<SpDataSet> dataSets = findDataSets();
+
+        // store new pipeline and relays
+        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+        deleteDataStreamRelayContainer(relaysToBeDeleted);
+        storeDataStreamRelayContainer(relaysToBePersisted);
+
+        // set global status
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+        return status;
+    }
+
     private void prepareMigration() {
         //Purge existing relays
         purgeExistingRelays();