You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/03/12 10:01:18 UTC
[incubator-streampipes] branch edge-extensions updated: [WIP] Added
PE side control stream route
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new f4ca8a4 [WIP] Added PE side control stream route
f4ca8a4 is described below
commit f4ca8a4a7e61abb803aff8878e78a3fe341210d4
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Fri Mar 12 10:57:49 2021 +0100
[WIP] Added PE side control stream route
---
.../container/api/InvocableEntityResource.java | 22 +++++--
.../management/pe/InvocableElementManager.java | 42 +++++++++++++
.../management/pe/PipelineElementLifeCycle.java | 2 +
.../StreamPipesReconfigurableProcessor.java | 11 ++--
.../standalone/manager/ProtocolManager.java | 49 ++++++++++-----
.../AbstractStandaloneSpInputCollector.java | 69 ++++++++++++++++++++++
.../StandaloneReconfigurationSpInputCollector.java | 44 ++++++++++++++
.../routing/StandaloneSpInputCollector.java | 36 ++---------
.../runtime/StandaloneEventProcessorRuntime.java | 20 +++++++
.../runtime/StandalonePipelineElementRuntime.java | 21 +++++++
.../wrapper/routing/RawDataProcessor.java | 2 +
.../ReconfigurableElement.java} | 8 +--
12 files changed, 265 insertions(+), 61 deletions(-)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 116b42c..7ba5233 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -18,10 +18,20 @@
package org.apache.streampipes.node.controller.container.api;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -30,6 +40,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
+import java.nio.charset.StandardCharsets;
@Path("/api/v2/node/element")
public class InvocableEntityResource extends AbstractResource {
@@ -89,11 +100,14 @@ public class InvocableEntityResource extends AbstractResource {
// Adaptation
@POST
@JacksonSerialized
- @Path("/adapt")
+ @Path("{identifier}/{elementId}/{runningInstanceId}/adapt")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response adapt() {
- //TODO: Push adaptation to pipeline element adaptation topic that is received via side-input in on configure.
- return ok();
+ public javax.ws.rs.core.Response adapt(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId,
+ @PathParam("runningInstanceId") String runningInstanceId,
+ String payload) {
+ InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
+ return ok(InvocableElementManager.getInstance().adapt(graph, payload));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 9891ee6..8d0e99e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -18,14 +18,23 @@
package org.apache.streampipes.node.controller.container.management.pe;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.node.NodeManager;
@@ -34,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
@@ -115,6 +125,38 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
}
}
+ @Override
+ public Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent) {
+ ObjectMapper mapper = new ObjectMapper();
+ Response r = new Response();
+ r.setElementId(graph.getElementId());
+ r.setSuccess(false);
+ try{
+ TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
+ .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
+ .getEventGrounding().getTransportProtocol().getClass());
+ tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
+ + graph.getDeploymentRunningInstanceId());
+ EventProducer pub;
+ if(tp instanceof KafkaTransportProtocol){
+ pub = new SpKafkaProducer();
+ pub.connect(tp);
+ }else if (tp instanceof JmsTransportProtocol){
+ pub = new ActiveMQPublisher();
+ pub.connect(tp);
+ } else{
+ pub = new MqttPublisher();
+ pub.connect(tp);
+ }
+ pub.publish(reconfigurationEvent.getBytes(StandardCharsets.UTF_8));
+ pub.disconnect();
+ r.setSuccess(true);
+ } catch (JsonProcessingException e) {
+ r.setOptionalMessage(e.getMessage());
+ }
+ return r;
+ }
+
private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
try {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 107c717..cdaf13e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -29,6 +29,8 @@ public interface PipelineElementLifeCycle {
Response detach(String runningInstanceId);
+ Response adapt(InvocableStreamPipesEntity graph, String reconfigurationEvent);
+
void unregister();
}
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
similarity index 75%
copy from streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
copy to streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
index 648b9b5..3f10359 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesReconfigurableProcessor.java
@@ -15,13 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.routing;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+package org.apache.streampipes.wrapper.standalone;
-import java.util.Map;
+import org.apache.streampipes.wrapper.runtime.ReconfigurableElement;
-public interface RawDataProcessor {
+public abstract class StreamPipesReconfigurableProcessor extends StreamPipesDataProcessor
+ implements ReconfigurableElement {
- void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
-}
+}
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index 8ad8fa7..015b099 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.wrapper.standalone.manager;
+import org.apache.streampipes.wrapper.standalone.routing.AbstractStandaloneSpInputCollector;
+import org.apache.streampipes.wrapper.standalone.routing.StandaloneReconfigurationSpInputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -32,6 +34,7 @@ import java.util.Map;
public class ProtocolManager {
public static Map<String, StandaloneSpInputCollector> consumers = new HashMap<>();
+ public static Map<String, StandaloneReconfigurationSpInputCollector> reconfigurationConsumers = new HashMap<>();
public static Map<String, StandaloneSpOutputCollector> producers = new HashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
@@ -41,38 +44,46 @@ public class ProtocolManager {
// in empire serializers
public static <T extends TransportProtocol> StandaloneSpInputCollector findInputCollector
- (T
- protocol,
+ (T protocol,
+ TransportFormat format, Boolean singletonEngine)
+ throws
+ SpRuntimeException {
+
+ if (!consumers.containsKey(topicName(protocol))) {
+ consumers.put(topicName(protocol), makeInputCollector(protocol, format, singletonEngine));
+ LOG.info("Adding new consumer to consumer map (size=" + consumers.size() + "): " + topicName(protocol));
+ }
+ return consumers.get(topicName(protocol));
+
+ }
+
+ public static <T extends TransportProtocol> StandaloneReconfigurationSpInputCollector findReconfigurationInputCollector
+ (T protocol,
TransportFormat format, Boolean
singletonEngine)
throws
SpRuntimeException {
- if (consumers.containsKey(topicName(protocol))) {
- return consumers.get(topicName(protocol));
- } else {
- consumers.put(topicName(protocol), makeInputCollector(protocol, format, singletonEngine));
- LOG.info("Adding new consumer to consumer map (size=" +consumers.size() +"): " +topicName(protocol));
- return consumers.get(topicName(protocol));
+ if (!reconfigurationConsumers.containsKey(topicName(protocol))) {
+ reconfigurationConsumers.put(topicName(protocol), makeReconfigurationInputCollector(protocol, format, singletonEngine));
+ LOG.info("Adding new reconfiguration consumer to reconfiguration consumer map (size=" + consumers.size() + "): " + topicName(protocol));
}
+ return reconfigurationConsumers.get(topicName(protocol));
}
public static <T extends TransportProtocol> StandaloneSpOutputCollector findOutputCollector
- (T
- protocol,
+ (T protocol,
TransportFormat format)
throws
SpRuntimeException {
- if (producers.containsKey(topicName(protocol))) {
- return producers.get(topicName(protocol));
- } else {
+ if (!producers.containsKey(topicName(protocol))) {
producers.put(topicName(protocol), makeOutputCollector(protocol, format));
- LOG.info("Adding new producer to producer map (size=" +producers.size() +"): " +topicName
+ LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName
(protocol));
- return producers.get(topicName(protocol));
}
+ return producers.get(topicName(protocol));
}
@@ -84,6 +95,14 @@ public class ProtocolManager {
return new StandaloneSpInputCollector<>(protocol, format, singletonEngine);
}
+ private static <T extends TransportProtocol> StandaloneReconfigurationSpInputCollector makeReconfigurationInputCollector
+ (T protocol,
+ TransportFormat format, Boolean
+ singletonEngine) throws
+ SpRuntimeException {
+ return new StandaloneReconfigurationSpInputCollector(protocol, format, singletonEngine);
+ }
+
public static <T extends TransportProtocol> StandaloneSpOutputCollector makeOutputCollector(T
protocol, TransportFormat format)
throws
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java
new file mode 100644
index 0000000..786d784
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/AbstractStandaloneSpInputCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.routing;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.wrapper.routing.RawDataProcessor;
+import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
+public abstract class AbstractStandaloneSpInputCollector <T extends TransportProtocol, DP extends RawDataProcessor> extends
+ StandaloneSpCollector<T, DP>
+ implements
+ InternalEventProcessor<byte[]> {
+
+ private Boolean singletonEngine;
+
+
+ public AbstractStandaloneSpInputCollector(T protocol, TransportFormat format,
+ Boolean singletonEngine) throws SpRuntimeException {
+ super(protocol, format);
+ this.singletonEngine = singletonEngine;
+ }
+
+ @Override
+ public void onEvent(byte[] event) {
+ if (singletonEngine) {
+ send(consumers.get(consumers.keySet().toArray()[0]), event);
+ } else {
+ consumers.forEach((key, value) -> send(value, event));
+ }
+ }
+
+ abstract void send(DP rawDataProcessor, byte[] event);
+
+ @Override
+ public void connect() throws SpRuntimeException {
+ if (!protocolDefinition.getConsumer().isConnected()) {
+ protocolDefinition.getConsumer().connect( transportProtocol,this);
+ }
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ if (protocolDefinition.getConsumer().isConnected()) {
+ if (consumers.size() == 0) {
+ protocolDefinition.getConsumer().disconnect();
+ ProtocolManager.removeInputCollector(transportProtocol);
+ }
+ }
+ }
+
+}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java
new file mode 100644
index 0000000..4832418
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneReconfigurationSpInputCollector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone.routing;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.wrapper.routing.RawDataProcessor;
+import org.apache.streampipes.wrapper.routing.SpInputCollector;
+
+public class StandaloneReconfigurationSpInputCollector<T extends TransportProtocol> extends
+ AbstractStandaloneSpInputCollector<T, RawDataProcessor>
+ implements SpInputCollector {
+
+
+ public StandaloneReconfigurationSpInputCollector(T protocol, TransportFormat format,
+ Boolean singletonEngine) throws SpRuntimeException {
+ super(protocol, format, singletonEngine);
+ }
+
+ void send(RawDataProcessor rawDataProcessor, byte[] event) {
+ try {
+ rawDataProcessor.reconfigure(dataFormatDefinition.toMap(event));
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index c9cea39..80a707d 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -27,29 +27,17 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
public class StandaloneSpInputCollector<T extends TransportProtocol> extends
- StandaloneSpCollector<T, RawDataProcessor>
- implements
- InternalEventProcessor<byte[]>, SpInputCollector {
-
- private Boolean singletonEngine;
+ AbstractStandaloneSpInputCollector<T, RawDataProcessor>
+ implements SpInputCollector {
public StandaloneSpInputCollector(T protocol, TransportFormat format,
Boolean singletonEngine) throws SpRuntimeException {
- super(protocol, format);
- this.singletonEngine = singletonEngine;
+ super(protocol, format, singletonEngine);
}
@Override
- public void onEvent(byte[] event) {
- if (singletonEngine) {
- send(consumers.get(consumers.keySet().toArray()[0]), event);
- } else {
- consumers.forEach((key, value) -> send(value, event));
- }
- }
-
- private void send(RawDataProcessor rawDataProcessor, byte[] event) {
+ void send(RawDataProcessor rawDataProcessor, byte[] event) {
try {
rawDataProcessor.process(dataFormatDefinition.toMap(event), getTopic());
} catch (SpRuntimeException e) {
@@ -57,20 +45,4 @@ public class StandaloneSpInputCollector<T extends TransportProtocol> extends
}
}
- @Override
- public void connect() throws SpRuntimeException {
- if (!protocolDefinition.getConsumer().isConnected()) {
- protocolDefinition.getConsumer().connect( transportProtocol,this);
- }
- }
-
- @Override
- public void disconnect() throws SpRuntimeException {
- if (protocolDefinition.getConsumer().isConnected()) {
- if (consumers.size() == 0) {
- protocolDefinition.getConsumer().disconnect();
- ProtocolManager.removeInputCollector(transportProtocol);
- }
- }
- }
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index b8d88ab..c5e259c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -20,12 +20,14 @@ package org.apache.streampipes.wrapper.standalone.runtime;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
+import org.apache.streampipes.wrapper.runtime.ReconfigurableElement;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
import java.util.Map;
@@ -60,10 +62,23 @@ public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingPara
}
@Override
+ public void reconfigure(Map<String, Object> rawEvent) throws SpRuntimeException {
+ if (getEngine() instanceof ReconfigurableElement){
+ ((ReconfigurableElement) getEngine()).onReconfigurationEvent(EventFactory.fromMap(rawEvent));
+ }
+ }
+
+ @Override
public void bindRuntime() throws SpRuntimeException {
bindEngine();
getInputCollectors().forEach(is -> is.registerConsumer(instanceId, this));
prepareRuntime();
+
+ if (getEngine() instanceof ReconfigurableElement){
+ SpInputCollector reconfigurationInputCollector = getReconfigurationInputCollector();
+ reconfigurationInputCollector.registerConsumer(instanceId, this);
+ reconfigurationInputCollector.connect();
+ }
}
@Override
@@ -82,6 +97,11 @@ public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingPara
}
getOutputCollector().disconnect();
+
+ if(getEngine() instanceof ReconfigurableElement){
+ getReconfigurationInputCollector().unregisterConsumer(instanceId);
+ getReconfigurationInputCollector().disconnect();
+ }
}
@Override
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
index 948383c..d482de5 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
@@ -17,9 +17,12 @@
*/
package org.apache.streampipes.wrapper.standalone.runtime;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.wrapper.context.RuntimeContext;
import org.apache.streampipes.wrapper.params.binding.BindingParams;
import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
@@ -28,6 +31,7 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.runtime.PipelineElement;
import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+import org.apache.streampipes.wrapper.standalone.routing.StandaloneReconfigurationSpInputCollector;
import java.util.ArrayList;
import java.util.List;
@@ -67,6 +71,23 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
return inputCollectors;
}
+ public SpInputCollector getReconfigurationInputCollector() throws SpRuntimeException{
+ ObjectMapper mapper = new ObjectMapper();
+ InvocableStreamPipesEntity graph = params.getBindingParams().getGraph();
+ try {
+ TransportProtocol tp = mapper.readValue(mapper.writeValueAsString(graph.getInputStreams().get(0)
+ .getEventGrounding().getTransportProtocol()), graph.getInputStreams().get(0)
+ .getEventGrounding().getTransportProtocol().getClass());
+ tp.getTopicDefinition().setActualTopicName("org.apache.streampipes.control.event.reconfigure."
+ + graph.getDeploymentRunningInstanceId());
+
+ return ProtocolManager.findReconfigurationInputCollector(tp,
+ graph.getInputStreams().get(0).getEventGrounding().getTransportFormats().get(0), true);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
public abstract void bindEngine() throws SpRuntimeException;
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
index 648b9b5..81e96dd 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
@@ -24,4 +24,6 @@ import java.util.Map;
public interface RawDataProcessor {
void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
+
+ default void reconfigure(Map<String, Object> rawEvent) throws SpRuntimeException {}
}
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
similarity index 80%
copy from streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
index 648b9b5..25316d9 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/RawDataProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ReconfigurableElement.java
@@ -15,13 +15,13 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.routing;
+package org.apache.streampipes.wrapper.runtime;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
-import java.util.Map;
+public interface ReconfigurableElement {
-public interface RawDataProcessor {
+ void onReconfigurationEvent(Event event) throws SpRuntimeException;
- void process(Map<String, Object> rawEvent, String sourceInfo) throws SpRuntimeException;
}