You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:23 UTC
[incubator-streampipes] 02/03: introduce state to processors
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 8e6390da5fd4a2fcd459b39e66ce0b4e96a63e71
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:06:51 2021 +0100
introduce state to processors
---
.../api/InvocablePipelineElementResource.java | 36 +++++
.../SemanticEventProcessingAgentDeclarer.java | 3 +-
.../model/base/ConsumableStreamPipesEntity.java | 17 +++
.../model/base/InvocableStreamPipesEntity.java | 15 +++
.../model/graph/DataProcessorInvocation.java | 2 +
.../controller/api/InvocableEntityResource.java | 25 ++++
.../management/pe/IPipelineElementLifeCycle.java | 4 +
.../pe/PipelineElementLifeCycleState.java | 2 +
.../management/pe/PipelineElementManager.java | 10 ++
.../pe/handler/PipelineElementStateHandler.java | 146 +++++++++++++++++++++
.../node/controller/utils/HttpUtils.java | 17 +++
.../builder/AbstractProcessingElementBuilder.java | 5 +
.../apache/streampipes/vocabulary/StreamPipes.java | 1 +
.../wrapper/spark/SparkDataProcessorDeclarer.java | 10 ++
.../standalone/StreamPipesDataProcessor.java | 14 ++
.../standalone/state/DefaultStateCollector.java | 92 +++++++++++++
.../wrapper/standalone/state/ObjectTypeTuple.java | 31 +++--
.../wrapper/standalone/state/StateObject.java | 18 +--
.../wrapper/declarer/EventProcessorDeclarer.java | 9 ++
.../state/serializer/JacksonStateSerializer.java | 57 ++++++++
.../wrapper/state/serializer/StateSerializer.java | 15 +--
ui/src/app/core-model/gen/streampipes-model.ts | 2 +
22 files changed, 502 insertions(+), 29 deletions(-)
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index 0b55885..578de6d 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.container.api;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.declarer.Declarer;
import org.apache.streampipes.container.declarer.InvocableDeclarer;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -150,6 +151,41 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
}
+
+ @GET
+ @Path("{elementId}/{runningInstanceId}/state")
+ @Produces(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response getState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+
+ InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+ if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+ String serializedState = ((SemanticEventProcessingAgentDeclarer) runningInstance).getState();
+
+ return ok(new Response(elementId, true, serializedState));
+ }
+
+ return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+ }
+
+ @PUT
+ @Path("{elementId}/{runningInstanceId}/state")
+ @Produces(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response setState(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId, String payload) {
+
+ InvocableDeclarer runningInstance = RunningInstances.INSTANCE.getInvocation(runningInstanceId);
+
+
+ if (runningInstance != null && runningInstance instanceof SemanticEventProcessingAgentDeclarer) {
+ ((SemanticEventProcessingAgentDeclarer) runningInstance).setState(payload);
+ return ok(new Response(elementId, true, "State update successful."));
+ }
+
+ return ok(new Response(elementId, false, "Could not find the running instance with id: " + runningInstanceId));
+ }
+
+
protected abstract P getExtractor(I graph);
protected abstract I createGroundingDebugInformation(I graph);
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
index 8ae0aff..9c43470 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProcessingAgentDeclarer.java
@@ -22,5 +22,6 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
public interface SemanticEventProcessingAgentDeclarer extends InvocableDeclarer<DataProcessorDescription, DataProcessorInvocation>{
-
+ void setState(String state);
+ String getState();
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
index 6236014..39bf423 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
@@ -66,11 +66,26 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
@RdfProperty(StreamPipes.ELEMENT_ENDPOINT_SERVICE_NAME)
private String elementEndpointServiceName;
+ @OneToOne(fetch = FetchType.EAGER,
+ cascade = {CascadeType.ALL})
+ @RdfProperty(StreamPipes.IS_STATEFUL)
+ private boolean isStateful;
+
+
+ public boolean isStateful() {
+ return isStateful;
+ }
+
+ public void setStateful(boolean stateful) {
+ isStateful = stateful;
+ }
+
public ConsumableStreamPipesEntity() {
super();
this.spDataStreams = new ArrayList<>();
this.staticProperties = new ArrayList<>();
this.resourceRequirements = new ArrayList<>();
+ this.setStateful(false);
}
public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
@@ -78,6 +93,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
this.spDataStreams = new ArrayList<>();
this.staticProperties = new ArrayList<>();
this.resourceRequirements = new ArrayList<>();
+ this.setStateful(false);
}
public ConsumableStreamPipesEntity(ConsumableStreamPipesEntity other) {
@@ -95,6 +111,7 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
if (other.getResourceRequirements() != null) {
this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
}
+ this.isStateful = other.isStateful;
}
public List<SpDataStream> getSpDataStreams() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 08c25a8..de46346 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -108,12 +108,27 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
private boolean uncompleted;
+ @OneToOne(fetch = FetchType.EAGER,
+ cascade = {CascadeType.ALL})
+ @RdfProperty(StreamPipes.IS_STATEFUL)
+ private boolean isStateful;
+
+
+ public boolean isStateful() {
+ return isStateful;
+ }
+
+ public void setStateful(boolean stateful) {
+ isStateful = stateful;
+ }
+
public InvocableStreamPipesEntity() {
super();
}
public InvocableStreamPipesEntity(InvocableStreamPipesEntity other) {
super(other);
+ this.setStateful(other.isStateful());
this.belongsTo = other.getBelongsTo();
this.correspondingPipeline = other.getCorrespondingPipeline();
this.inputStreams = new Cloner().streams(other.getInputStreams());
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 1e7a4d6..3adb822 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -87,6 +87,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
this.setOutputStreamRelays(sepa.getOutputStreamRelays());
this.setEventRelayStrategy(sepa.getEventRelayStrategy());
this.setResourceRequirements(sepa.getResourceRequirements());
+ this.setStateful(sepa.isStateful());
//this.setUri(belongsTo +"/" +getElementId());
}
@@ -103,6 +104,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
this.pathName = other.getPathName();
this.eventRelayStrategy = other.getEventRelayStrategy();
this.category = new Cloner().epaTypes(other.getCategory());
+ this.setStateful(other.isStateful());
if (other.getResourceRequirements() != null) {
this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index f950fa5..d1e4399 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -91,6 +91,31 @@ public class InvocableEntityResource extends AbstractResource {
return ok(resp);
}
+ @GET
+ @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+ @Produces(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response getState(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId,
+ @PathParam("runningInstanceId") String runningInstanceId) {
+ InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+ Response resp = PipelineElementManager.getInstance().getState(graph, runningInstanceId);
+
+ return ok(resp);
+ }
+
+ @PUT
+ @Path("{identifier}/{elementId}/{runningInstanceId}/state")
+ @Produces(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response setState(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId,
+ @PathParam("runningInstanceId") String runningInstanceId,
+ String state) {
+ InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+ Response resp = PipelineElementManager.getInstance().setState(graph, runningInstanceId, state);
+
+ return ok(resp);
+ }
+
// Live-Reconfiguration
@POST
@JacksonSerialized
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
index 3daa469..cbd3194 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
@@ -30,6 +30,10 @@ public interface IPipelineElementLifeCycle {
Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
+ Response getState(InvocableStreamPipesEntity graph, String runningInstanceId);
+
+ Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state);
+
Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
Response offload(InvocableStreamPipesEntity graph);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
index bad4511..1f1edb5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
@@ -23,5 +23,7 @@ public enum PipelineElementLifeCycleState {
DETACH,
RECONFIGURE,
OFFLOAD,
+ GETSTATE,
+ SETSTATE,
DEREGISTER
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
index 7927814..0e87da4 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementManager.java
@@ -80,6 +80,16 @@ public class PipelineElementManager implements IPipelineElementLifeCycle {
}
@Override
+ public Response getState(InvocableStreamPipesEntity graph, String runningInstanceId) {
+ return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.GETSTATE, runningInstanceId).handle();
+ }
+
+ @Override
+ public Response setState(InvocableStreamPipesEntity graph, String runningInstanceId, String state) {
+ return new PipelineElementStateHandler(graph, PipelineElementLifeCycleState.SETSTATE, runningInstanceId, state).handle();
+ }
+
+ @Override
public void deregister(){
// TODO: unregister element from Consul
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
new file mode 100644
index 0000000..b661f56
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementStateHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.pe.handler;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.node.controller.management.IHandler;
+import org.apache.streampipes.node.controller.management.pe.PipelineElementLifeCycleState;
+import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class PipelineElementStateHandler implements IHandler<Response> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInteractionHandler.class.getCanonicalName());
+ private static final String SLASH = "/";
+ private static final String STATE = "state";
+ private static final long RETRY_INTERVAL_MS = 5000;
+
+ private final InvocableStreamPipesEntity graph;
+ private final PipelineElementLifeCycleState type;
+ private final String runningInstanceId;
+ private final String state;
+
+
+ public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId) {
+ this(graph, type, runningInstanceId, "{}");
+ }
+
+ public PipelineElementStateHandler(InvocableStreamPipesEntity graph, PipelineElementLifeCycleState type, String runningInstanceId, String state) {
+ this.graph = graph;
+ this.type = type;
+ this.runningInstanceId = runningInstanceId;
+ this.state = state;
+ }
+
+
+ @Override
+ public Response handle() {
+ switch(type) {
+ case GETSTATE:
+ return getState();
+ case SETSTATE:
+ return setState();
+ default:
+ throw new SpRuntimeException("Life cycle step not supported" + type);
+ }
+ }
+
+
+ private Response getState(){
+ Response response = new Response();
+ String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+ LOG.info("Trying to get state of pipeline element: {}", url);
+ boolean connected = false;
+ while (!connected) {
+
+ response = HttpUtils.get(url, Response.class);
+ connected = response.isSuccess();
+
+ response.setOptionalMessage(response.getOptionalMessage());
+
+ if (!connected) {
+ LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to get State of pipeline element: " + url, e);
+ }
+ }
+ }
+ LOG.info("Successfully retrieved state from pipeline element {}", url);
+ return response;
+ }
+
+ private Response setState(){
+ Response response = new Response();
+ String url = graph.getBelongsTo() + SLASH + runningInstanceId + SLASH + STATE;
+
+ LOG.info("Trying to set state of pipeline element: {}", url);
+ boolean connected = false;
+ while (!connected) {
+
+
+ response = HttpUtils.putAndRespond(url, state);
+
+ connected = response.isSuccess();
+
+ if (!connected) {
+ LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to set State of pipeline element: " + url, e);
+ }
+ }
+ }
+ LOG.info("Successfully retrieved state from pipeline element {}", url);
+ return response;
+ }
+
+
+ //TODO: Optimize compression and move compression to PipelineElement Container (Brotli or Zstandard)
+ //Preliminary compression logic copied from https://gist.github.com/yfnick/227e0c12957a329ad138
+ private static byte[] compress(String data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ gzip.write(data.getBytes());
+ gzip.close();
+ byte[] compressed = bos.toByteArray();
+ bos.close();
+ return compressed;
+ }
+
+ private static String decompress(final byte[] compressed) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ byte[] bytes = IOUtils.toByteArray(gis);
+ return new String(bytes, "UTF-8");
+ }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
index f1e0536..378bf0b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -126,6 +126,23 @@ public class HttpUtils {
}
}
+ //TODO: Naming is provisional; rename
+ public static org.apache.streampipes.model.Response putAndRespond(String url, String body) {
+ org.apache.streampipes.model.Response response = new org.apache.streampipes.model.Response();
+ javax.ws.rs.core.Response.ok().build();
+ response.setSuccess(false);
+ try {
+ response = deserialize(Request.Put(url)
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute(), org.apache.streampipes.model.Response.class);
+ return response;
+ } catch (IOException e) {
+ LOG.error("Something went wrong during PUT request", e);
+ return response;
+ }
+ }
+
public static String post(String url, String body) {
try {
return Request.Post(url)
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index e8d2f6b..c93ea54 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -272,6 +272,11 @@ public abstract class AbstractProcessingElementBuilder<BU extends
return me();
}
+ public BU containsState(){
+ this.elementDescription.setStateful(true);
+ return me();
+ }
+
@Override
public void prepareBuild() {
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 81fd4bd..d22e6d3 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -508,4 +508,5 @@ public class StreamPipes {
public static final String HAS_CONTAINER_ENV_KEY = NS + "hasContainerEnvKey";
public static final String HAS_CONTAINER_ENV_VALUE = NS + "hasContainerEnvValue";
public static final String HAS_DEPLOYMENT_CONTAINER = NS + "hasDeploymentContainer";
+ public static final String IS_STATEFUL = NS + "isStateful";
}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
index 91fbf57..d07a3d4 100644
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
@@ -25,4 +25,14 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
public abstract class SparkDataProcessorDeclarer<B extends EventProcessorBindingParams>
extends AbstractSparkDeclarer<DataProcessorDescription, DataProcessorInvocation, SparkDataProcessorRuntime> implements SemanticEventProcessingAgentDeclarer {
+ //If not overwritten elements are regarded as stateless
+ //TODO: Assess if this class is needed in its current form as it is not used in extensions atm
+ @Override
+ public void setState(String state) {
+ }
+
+ @Override
+ public String getState() {
+ return null;
+ }
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde02..07ee0cb 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -21,15 +21,29 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.standalone.state.DefaultStateCollector;
import java.util.function.Supplier;
public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
+ protected DefaultStateCollector stateCollector = null;
+
@Override
public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
}
+ @Override
+ public String getState() {
+ if(this.stateCollector != null)
+ return this.stateCollector.getState();
+ return null;
+ }
+
+ @Override
+ public void setState(String state) {
+ this.stateCollector.setState(state);
+ }
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
new file mode 100644
index 0000000..7683f4e
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/DefaultStateCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.state;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.streampipes.wrapper.state.serializer.JacksonStateSerializer;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultStateCollector {
+
+ private ArrayList<Field> fields;
+ private HashMap<String, Field> fieldsMap;
+ private Object obj;
+ private StateSerializer serializer;
+
+
+ public DefaultStateCollector(Object o){
+ this(o, new JacksonStateSerializer());
+ }
+
+ public DefaultStateCollector(Object o, StateSerializer serializer){
+ this.obj = o;
+ this.fields = new ArrayList<>(Arrays.asList(o.getClass().getDeclaredFields()));
+ this.serializer = serializer;
+ //Only keep marked fields as part of the State
+ for(Field f : o.getClass().getDeclaredFields()){
+ if(f.getAnnotation(StateObject.class) == null){
+ this.fields.remove(f);
+ }
+ }
+
+ this.fieldsMap = new HashMap<>();
+ //Make a map of all fields with their respective Names
+ for(Field f: fields){
+ f.setAccessible(true);
+ this.fieldsMap.put(f.getName(), f);
+ }
+ }
+
+ public void setState(String state){
+ Type t = new TypeReference<HashMap<String, ObjectTypeTuple>>(){}.getType();
+ String mapTypeSignature = serializer.getTypeSignature(t);
+ HashMap<String, ObjectTypeTuple> map = serializer.deserialize(state, mapTypeSignature);
+ for(Map.Entry<String, ObjectTypeTuple> entry : map.entrySet()){
+ try {
+ this.fieldsMap.get(entry.getKey()).set(this.obj, serializer.deserialize(
+ entry.getValue().getObjectSerialization(), entry.getValue().getGenericTypeSignature()));
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ public String getState() {
+ Map<String, ObjectTypeTuple> list = new HashMap<>();
+ for(Field f : this.fields){
+ try {
+ if(f.get(this.obj) != null){
+ ObjectTypeTuple objectTypeTuple = new ObjectTypeTuple(f.get(this.obj), this.serializer, f.getGenericType());
+ list.put(f.getName(), objectTypeTuple);
+ }
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+ return serializer.serialize(list);
+ }
+
+}
\ No newline at end of file
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
similarity index 51%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
index 3daa469..343d738 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/IPipelineElementLifeCycle.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/ObjectTypeTuple.java
@@ -15,25 +15,32 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.standalone.state;
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
+import org.apache.streampipes.wrapper.state.serializer.StateSerializer;
+import java.lang.reflect.Type;
-public interface IPipelineElementLifeCycle {
+public class ObjectTypeTuple {
- void register(InvocableRegistration registration);
+ private String genericTypeSignature;
+ private String objectSerialization;
- Response invoke(InvocableStreamPipesEntity graph);
+ //Default constructor needed for Jackson deserialization
+ public ObjectTypeTuple(){}
- Response detach(InvocableStreamPipesEntity graph, String runningInstanceId);
+ public ObjectTypeTuple(Object obj, StateSerializer serializer, Type type){
+ this.genericTypeSignature = serializer.getTypeSignature(type);
+ this.objectSerialization = serializer.serialize(obj);
+ }
- Response reconfigure(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEvent);
+ public String getGenericTypeSignature(){
+ return this.genericTypeSignature;
+ }
+
+ public String getObjectSerialization(){
+ return this.objectSerialization;
+ }
- Response offload(InvocableStreamPipesEntity graph);
- void deregister();
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
similarity index 71%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
index bad4511..3c0554c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/state/StateObject.java
@@ -15,13 +15,15 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.management.pe;
-public enum PipelineElementLifeCycleState {
- REGISTER,
- INVOKE,
- DETACH,
- RECONFIGURE,
- OFFLOAD,
- DEREGISTER
+package org.apache.streampipes.wrapper.standalone.state;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface StateObject {
}
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
index fa8c5bf..9a54f54 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
@@ -44,4 +44,13 @@ public abstract class EventProcessorDeclarer<B extends EventProcessorBindingPara
return invokeEPRuntime(graph);
}
+ //If not overwritten elements are regarded as stateless
+ @Override
+ public void setState(String state) {
+ }
+
+ @Override
+ public String getState() {
+ return null;
+ }
}
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
new file mode 100644
index 0000000..88f55ac
--- /dev/null
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/JacksonStateSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.state.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+
+import java.lang.reflect.Type;
+
+public class JacksonStateSerializer implements StateSerializer {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public String serialize(Object o) {
+ try {
+ return objectMapper.writeValueAsString(o);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public <T> T deserialize(String serializedObject, String typeSignature) {
+ TypeFactory typeFactory = objectMapper.getTypeFactory();
+ JavaType inputType = typeFactory.constructFromCanonical(typeSignature);
+ try {
+ return objectMapper.readValue(serializedObject, inputType);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public String getTypeSignature(Type type) {
+ return objectMapper.constructType(type).toCanonical();
+ }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
similarity index 75%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
index bad4511..3807fcc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/PipelineElementLifeCycleState.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/state/serializer/StateSerializer.java
@@ -15,13 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.management.pe;
+package org.apache.streampipes.wrapper.state.serializer;
-public enum PipelineElementLifeCycleState {
- REGISTER,
- INVOKE,
- DETACH,
- RECONFIGURE,
- OFFLOAD,
- DEREGISTER
+import java.lang.reflect.Type;
+
+public interface StateSerializer {
+ String serialize(Object o);
+ <T> T deserialize(String serializedObject, String typeSignature);
+ String getTypeSignature(Type type);
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f0e9657..9a7f187 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -1083,6 +1083,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
preemption: boolean;
priorityScore: number;
resourceRequirements: NodeResourceRequirementUnion[];
+ stateful: boolean;
staticProperties: StaticPropertyUnion[];
statusInfoSettings: ElementStatusInfoSettings;
streamRequirements: SpDataStreamUnion[];
@@ -1115,6 +1116,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
instance.preemption = data.preemption;
instance.configured = data.configured;
instance.uncompleted = data.uncompleted;
+ instance.stateful = data.stateful;
return instance;
}
}