You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/03/12 10:01:18 UTC

[incubator-streampipes] branch edge-extensions updated: [WIP] Added PE side control stream route

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new f4ca8a4  [WIP] Added PE side control stream route
f4ca8a4 is described below

commit f4ca8a4a7e61abb803aff8878e78a3fe341210d4
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Fri Mar 12 10:57:49 2021 +0100

    [WIP] Added PE side control stream route
---
 .../container/api/InvocableEntityResource.java     | 22 +++++--
 .../management/pe/InvocableElementManager.java     | 42 +++++++++++++
 .../management/pe/PipelineElementLifeCycle.java    |  2 +
 .../StreamPipesReconfigurableProcessor.java        | 11 ++--
 .../standalone/manager/ProtocolManager.java        | 49 ++++++++++-----
 .../AbstractStandaloneSpInputCollector.java        | 69 ++++++++++++++++++++++
 .../StandaloneReconfigurationSpInputCollector.java | 44 ++++++++++++++
 .../routing/StandaloneSpInputCollector.java        | 36 ++---------
 .../runtime/StandaloneEventProcessorRuntime.java   | 20 +++++++
 .../runtime/StandalonePipelineElementRuntime.java  | 21 +++++++
 .../wrapper/routing/RawDataProcessor.java          |  2 +
 .../ReconfigurableElement.java}                    |  8 +--
 12 files changed, 265 insertions(+), 61 deletions(-)

diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 116b42c..7ba5233 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -18,10 +18,20 @@
 package org.apache.streampipes.node.controller.container.api;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -30,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
+import java.nio.charset.StandardCharsets;
 
 @Path("/api/v2/node/element")
 public class InvocableEntityResource extends AbstractResource {
@@ -89,11 +100,14 @@ public class InvocableEntityResource extends AbstractResource {
     // Adaptation
     @POST
     @JacksonSerialized
-    @Path("/adapt")
+    @Path("{identifier}/{elementId}/{runningInstanceId}/adapt")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public javax.ws.rs.core.Response adapt() {
-        //TODO: Push adaptation to pipeline element adaptation topic that is received via side-input in on configure.
-        return ok();
+    public javax.ws.rs.core.Response adapt(@PathParam("identifier") String identifier,
+                                           @PathParam("elementId") String elementId,
+                                           @PathParam("runningInstanceId") String runningInstanceId,
+                                           String payload) {
+        InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+        return ok(InvocableElementManager.getInstance().adapt(graph, payload));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 9891ee6..8d0e99e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -18,14 +18,23 @@
 package org.apache.streampipes.node.controller.container.management.pe;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
@@ -34,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 
@@ -115,6 +125,38 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         }
     }
 
+    @Override
+    public Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent) {
+        ObjectMapper mapper = new ObjectMapper();
+        Response r = new Response();
+        r.setElementId(graph.getElementId());
+        r.setSuccess(false);
+        try{
+            TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
+                    .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
+                    .getEventGrounding().getTransportProtocol().getClass());
+            tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
+                    + graph.getDeploymentRunningInstanceId());
+            EventProducer pub;
+            if(tp instanceof KafkaTransportProtocol){
+                pub = new SpKafkaProducer();
+                pub.connect(tp);
+            }else if (tp instanceof JmsTransportProtocol){
+                pub = new ActiveMQPublisher();
+                pub.connect(tp);
+            } else{
+                pub = new MqttPublisher();
+                pub.connect(tp);
+            }
+            pub.publish(reconfigurationEvent.getBytes(StandardCharsets.UTF_8));
+            pub.disconnect();
+            r.setSuccess(true);
+        } catch (JsonProcessingException e) {
+            r.setOptionalMessage(e.getMessage());
+        }
+        return r;
+    }
+
     private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
         setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
         try {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 107c717..cdaf13e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -29,6 +29,8 @@ public interface PipelineElementLifeCycle {
 
     Response detach(String runningInstanceId);
 
+    Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent);
+
     void unregister();
 
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
similarity index 75%
copy from streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
index 648b9b5..3f10359 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.routing;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+package org.apache.streampipes.wrapper.standalone;
 
-import java.util.Map;
+import org.apache.streampipes.wrapper.runtime.ReconfigurableElement;
 
-public interface RawDataProcessor {
+public abstract class StreamPipesReconfigurableProcessor extends StreamPipesDataProcessor
+        implements ReconfigurableElement {
 
-  void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
-}
+}
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index 8ad8fa7..015b099 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.wrapper.standalone.manager;
 
+import org.apache.streampipes.wrapper.standalone.routing.AbstractStandaloneSpInputCollector;
+import org.apache.streampipes.wrapper.standalone.routing.StandaloneReconfigurationSpInputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -32,6 +34,7 @@ import java.util.Map;
 public class ProtocolManager {
 
   public static Map<String, StandaloneSpInputCollector> consumers = new HashMap<>();
+  public static Map<String, StandaloneReconfigurationSpInputCollector> reconfigurationConsumers = new HashMap<>();
   public static Map<String, StandaloneSpOutputCollector> producers = new HashMap<>();
 
   private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
@@ -41,38 +44,46 @@ public class ProtocolManager {
   // in empire serializers
 
   public static <T extends TransportProtocol> StandaloneSpInputCollector findInputCollector
-          (T
-                   protocol,
+          (T protocol,
+           TransportFormat format, Boolean singletonEngine)
+          throws
+          SpRuntimeException {
+
+    if (!consumers.containsKey(topicName(protocol))) {
+      consumers.put(topicName(protocol), makeInputCollector(protocol, format, singletonEngine));
+      LOG.info("Adding new consumer to consumer map (size=" + consumers.size() + "): " + topicName(protocol));
+    }
+    return consumers.get(topicName(protocol));
+
+  }
+
+  public static <T extends TransportProtocol> StandaloneReconfigurationSpInputCollector findReconfigurationInputCollector
+          (T protocol,
            TransportFormat format, Boolean
                    singletonEngine)
           throws
           SpRuntimeException {
 
-    if (consumers.containsKey(topicName(protocol))) {
-      return consumers.get(topicName(protocol));
-    } else {
-      consumers.put(topicName(protocol), makeInputCollector(protocol, format, singletonEngine));
-      LOG.info("Adding new consumer to consumer map (size=" +consumers.size() +"): " +topicName(protocol));
-      return consumers.get(topicName(protocol));
+    if (!reconfigurationConsumers.containsKey(topicName(protocol))) {
+      reconfigurationConsumers.put(topicName(protocol), makeReconfigurationInputCollector(protocol, format, singletonEngine));
+      LOG.info("Adding new reconfiguration consumer to reconfiguration consumer map (size=" + consumers.size() + "): " + topicName(protocol));
     }
+    return reconfigurationConsumers.get(topicName(protocol));
 
   }
 
   public static <T extends TransportProtocol> StandaloneSpOutputCollector findOutputCollector
-          (T
-                   protocol,
+          (T protocol,
            TransportFormat format)
           throws
           SpRuntimeException {
 
-    if (producers.containsKey(topicName(protocol))) {
-      return producers.get(topicName(protocol));
-    } else {
+    if (!producers.containsKey(topicName(protocol))) {
       producers.put(topicName(protocol), makeOutputCollector(protocol, format));
-      LOG.info("Adding new producer to producer map (size=" +producers.size() +"): " +topicName
+      LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName
               (protocol));
-      return producers.get(topicName(protocol));
     }
+    return producers.get(topicName(protocol));
 
   }
 
@@ -84,6 +95,14 @@ public class ProtocolManager {
     return new StandaloneSpInputCollector<>(protocol, format, singletonEngine);
   }
 
+  private static <T extends TransportProtocol> StandaloneReconfigurationSpInputCollector makeReconfigurationInputCollector
+          (T protocol,
+           TransportFormat format, Boolean
+                   singletonEngine) throws
+          SpRuntimeException {
+    return new StandaloneReconfigurationSpInputCollector(protocol, format, singletonEngine);
+  }
+
   public static <T extends TransportProtocol> StandaloneSpOutputCollector makeOutputCollector(T
                                                                                                 protocol, TransportFormat format)
           throws
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java
new file mode 100644
index 0000000..786d784
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.routing;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.wrapper.routing.RawDataProcessor;
+import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
+public abstract class AbstractStandaloneSpInputCollector <T extends TransportProtocol, DP extends RawDataProcessor> extends
+        StandaloneSpCollector<T, DP>
+        implements
+        InternalEventProcessor<byte[]> {
+
+    private Boolean singletonEngine;
+
+
+    public AbstractStandaloneSpInputCollector(T protocol, TransportFormat format,
+                                                     Boolean singletonEngine) throws SpRuntimeException {
+        super(protocol, format);
+        this.singletonEngine = singletonEngine;
+    }
+
+    @Override
+    public void onEvent(byte[] event) {
+        if (singletonEngine) {
+            send(consumers.get(consumers.keySet().toArray()[0]), event);
+        } else {
+            consumers.forEach((key, value) -> send(value, event));
+        }
+    }
+
+    abstract void send(DP rawDataProcessor, byte[] event);
+
+    @Override
+    public void connect() throws SpRuntimeException {
+        if (!protocolDefinition.getConsumer().isConnected()) {
+            protocolDefinition.getConsumer().connect( transportProtocol,this);
+        }
+    }
+
+    @Override
+    public void disconnect() throws SpRuntimeException {
+        if (protocolDefinition.getConsumer().isConnected()) {
+            if (consumers.size() == 0) {
+                protocolDefinition.getConsumer().disconnect();
+                ProtocolManager.removeInputCollector(transportProtocol);
+            }
+        }
+    }
+
+}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java
new file mode 100644
index 0000000..4832418
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.routing;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.wrapper.routing.RawDataProcessor;
+import org.apache.streampipes.wrapper.routing.SpInputCollector;
+
+public class StandaloneReconfigurationSpInputCollector<T extends TransportProtocol> extends
+        AbstractStandaloneSpInputCollector<T, RawDataProcessor>
+        implements SpInputCollector {
+
+
+    public StandaloneReconfigurationSpInputCollector(T protocol, TransportFormat format,
+                                                     Boolean singletonEngine) throws SpRuntimeException {
+        super(protocol, format, singletonEngine);
+    }
+
+    void send(RawDataProcessor rawDataProcessor, byte[] event) {
+        try {
+            rawDataProcessor.reconfigure(dataFormatDefinition.toMap(event));
+        } catch (SpRuntimeException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index c9cea39..80a707d 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -27,29 +27,17 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
 
 public class StandaloneSpInputCollector<T extends TransportProtocol> extends
-        StandaloneSpCollector<T, RawDataProcessor>
-        implements
-        InternalEventProcessor<byte[]>, SpInputCollector {
-
-  private Boolean singletonEngine;
+        AbstractStandaloneSpInputCollector<T, RawDataProcessor>
+        implements SpInputCollector {
 
 
   public StandaloneSpInputCollector(T protocol, TransportFormat format,
                                     Boolean singletonEngine) throws SpRuntimeException {
-    super(protocol, format);
-    this.singletonEngine = singletonEngine;
+    super(protocol, format, singletonEngine);
   }
 
   @Override
-  public void onEvent(byte[] event) {
-    if (singletonEngine) {
-     send(consumers.get(consumers.keySet().toArray()[0]), event);
-    } else {
-      consumers.forEach((key, value) -> send(value, event));
-    }
-  }
-
-  private void send(RawDataProcessor rawDataProcessor, byte[] event) {
+  void send(RawDataProcessor rawDataProcessor, byte[] event) {
     try {
       rawDataProcessor.process(dataFormatDefinition.toMap(event), getTopic());
     } catch (SpRuntimeException e) {
@@ -57,20 +45,4 @@ public class StandaloneSpInputCollector<T extends TransportProtocol> extends
     }
   }
 
-  @Override
-  public void connect() throws SpRuntimeException {
-    if (!protocolDefinition.getConsumer().isConnected()) {
-        protocolDefinition.getConsumer().connect( transportProtocol,this);
-    }
-  }
-
-  @Override
-  public void disconnect() throws SpRuntimeException {
-    if (protocolDefinition.getConsumer().isConnected()) {
-      if (consumers.size() == 0) {
-        protocolDefinition.getConsumer().disconnect();
-        ProtocolManager.removeInputCollector(transportProtocol);
-      }
-    }
-  }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index b8d88ab..c5e259c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -20,12 +20,14 @@ package org.apache.streampipes.wrapper.standalone.runtime;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
 import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
+import org.apache.streampipes.wrapper.runtime.ReconfigurableElement;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
 
 import java.util.Map;
@@ -60,10 +62,23 @@ public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingPara
   }
 
   @Override
+  public void reconfigure(Map<String, Object> rawEvent) throws SpRuntimeException {
+    if (getEngine() instanceof ReconfigurableElement){
+      ((ReconfigurableElement) getEngine()).onReconfigurationEvent(EventFactory.fromMap(rawEvent));
+    }
+  }
+
+  @Override
   public void bindRuntime() throws SpRuntimeException {
     bindEngine();
     getInputCollectors().forEach(is -> is.registerConsumer(instanceId, this));
     prepareRuntime();
+
+    if (getEngine() instanceof ReconfigurableElement){
+      SpInputCollector reconfigurationInputCollector = getReconfigurationInputCollector();
+      reconfigurationInputCollector.registerConsumer(instanceId, this);
+      reconfigurationInputCollector.connect();
+    }
   }
 
   @Override
@@ -82,6 +97,11 @@ public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingPara
     }
 
     getOutputCollector().disconnect();
+
+    if(getEngine() instanceof ReconfigurableElement){
+      getReconfigurationInputCollector().unregisterConsumer(instanceId);
+      getReconfigurationInputCollector().disconnect();
+    }
   }
 
   @Override
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
index 948383c..d482de5 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
@@ -17,9 +17,12 @@
  */
 package org.apache.streampipes.wrapper.standalone.runtime;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
@@ -28,6 +31,7 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.runtime.PipelineElement;
 import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+import org.apache.streampipes.wrapper.standalone.routing.StandaloneReconfigurationSpInputCollector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -67,6 +71,23 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
     return inputCollectors;
   }
 
+  public SpInputCollector getReconfigurationInputCollector() throws SpRuntimeException{
+    ObjectMapper mapper = new ObjectMapper();
+    InvocableStreamPipesEntity graph = params.getBindingParams().getGraph();
+    try {
+      TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
+              .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
+              .getEventGrounding().getTransportProtocol().getClass());
+      tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
+              + graph.getDeploymentRunningInstanceId());
+
+      return ProtocolManager.findReconfigurationInputCollector(tp,
+              graph.getInputStreams().get(0).getEventGrounding().getTransportFormats().get(0), true);
+    } catch (JsonProcessingException e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+
   public abstract void bindEngine() throws SpRuntimeException;
 
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
index 648b9b5..81e96dd 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
@@ -24,4 +24,6 @@ import java.util.Map;
 public interface RawDataProcessor {
 
   void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
+
+  default void reconfigure(Map<String, Object> rawEvent) throws SpRuntimeException {}
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
similarity index 80%
copy from streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
index 648b9b5..25316d9 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.routing;
+package org.apache.streampipes.wrapper.runtime;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
 
-import java.util.Map;
+public interface ReconfigurableElement {
 
-public interface RawDataProcessor {
+    void onReconfigurationEvent(Event event) throws SpRuntimeException;
 
-  void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
 }