You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/17 09:17:21 UTC

[incubator-streampipes] branch edge-extensions updated (56656c3 -> 45b1f3c)

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 56656c3  [WIP] pipeline element live reconfiguration
     new 0ea252b  [STREAMPIPES-174] add new StreamPipesExternalDataProcessor
     new 45b1f3c  Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/InvocablePipelineElementResource.java      |  42 +++---
 .../controller/container/NodeControllerInit.java   |   9 +-
 .../container/management/RunningInstances.java     |   4 +
 .../container/management/node/NodeManager.java     |  17 ++-
 .../orchestrator/RunningContainerInstances.java    |   6 +
 .../management/pe/InvocableElementManager.java     |  23 ++++
 .../management/pe/RunningInvocableInstances.java   |   7 +
 .../management/relay/RunningRelayInstances.java    |   5 +
 .../controller/container/storage/CRUDStorage.java  |   2 +
 .../controller/container/storage/MapDBImpl.java    |   7 +
 .../StreamPipesExternalDataProcessor.java          | 145 +++++++++++++++++++++
 11 files changed, 244 insertions(+), 23 deletions(-)
 create mode 100644 streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java


[incubator-streampipes] 02/02: Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 45b1f3c0271b636989131b2a1ac4f0711fc38f97
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Mar 17 10:14:30 2021 +0100

    Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart
---
 .../api/InvocablePipelineElementResource.java      | 42 +++++++++++++---------
 .../controller/container/NodeControllerInit.java   |  9 ++++-
 .../container/management/RunningInstances.java     |  4 +++
 .../container/management/node/NodeManager.java     | 17 ++++++---
 .../orchestrator/RunningContainerInstances.java    |  6 ++++
 .../management/pe/InvocableElementManager.java     | 23 ++++++++++++
 .../management/pe/RunningInvocableInstances.java   |  7 ++++
 .../management/relay/RunningRelayInstances.java    |  5 +++
 .../controller/container/storage/CRUDStorage.java  |  2 ++
 .../controller/container/storage/MapDBImpl.java    |  7 ++++
 10 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index e612c16..a56a7d3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -55,26 +55,30 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
     @JacksonSerialized
     public javax.ws.rs.core.Response invokeRuntime(@PathParam("elementId") String elementId, I graph) {
 
-        try {
-            if (isDebug()) {
-              graph = createGroundingDebugInformation(graph);
+        // in case element container is still running when backend/node controller crashes
+        if (!alreadyRunning(graph)) {
+            try {
+                if (isDebug()) {
+                    graph = createGroundingDebugInformation(graph);
+                }
+
+                InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
+
+                if (declarer != null) {
+                    //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
+                    String runningInstanceId = graph.getDeploymentRunningInstanceId();
+                    RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
+                    Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
+                    return ok(resp);
+                }
+            } catch (InstantiationException | IllegalAccessException e) {
+                e.printStackTrace();
+                return ok(new Response(elementId, false, e.getMessage()));
             }
 
-            InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
-
-            if (declarer != null) {
-                //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
-                String runningInstanceId = graph.getDeploymentRunningInstanceId();
-                RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
-                Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
-                return ok(resp);
-            }
-        } catch (InstantiationException | IllegalAccessException e) {
-            e.printStackTrace();
-            return ok(new Response(elementId, false, e.getMessage()));
+            return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
         }
-
-        return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
+        return ok(new Response(elementId, true, "Already running element with id: " + elementId));
     }
 
     @POST
@@ -147,5 +151,9 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
     private Boolean isDebug() {
         return "true".equals(System.getenv("SP_DEBUG"));
     }
+
+    private Boolean alreadyRunning(I graph) {
+        return RunningInstances.INSTANCE.getInvocation(graph.getDeploymentRunningInstanceId()) != null;
+    }
 }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index 2a3890f..c62d9c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.api.NodeControllerResour
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
+import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +32,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.Collections;
 
@@ -58,7 +60,7 @@ public class NodeControllerInit {
         LOG.info("Load node info description");
         NodeManager.getInstance().init();
 
-        LOG.info("Register node controller at backend");
+        LOG.info("Register node controller at node management");
         boolean success = NodeManager.getInstance().register();
 
         if (success) {
@@ -75,6 +77,11 @@ public class NodeControllerInit {
         } else throw new SpRuntimeException("Could not register node controller at backend");
     }
 
+    @PostConstruct
+    public void init() {
+        InvocableElementManager.getInstance().invokePipelineElementsOnSystemRebootOrRestart();
+    }
+
     @PreDestroy
     public void onExit(){
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
index 7091646..934f917 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.node.controller.container.management;
 
+import java.util.List;
+
 public interface RunningInstances<T> {
 
     void add(String id, T value);
@@ -25,5 +27,7 @@ public interface RunningInstances<T> {
 
     T get(String id);
 
+    List<T> getAll();
+
     void remove(String id);
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index dac74c2..472f38d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -26,14 +26,17 @@ import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.node.*;
+import org.apache.streampipes.model.node.container.DeploymentContainer;
 import org.apache.streampipes.model.node.container.DockerContainer;
 import org.apache.streampipes.model.node.meta.GeoLocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 
 public class NodeManager {
     private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
@@ -130,6 +133,10 @@ public class NodeManager {
         }
     }
 
+    public List<DeploymentContainer> getRegisteredContainer() {
+        return this.nodeInfo.getRegisteredContainers();
+    }
+
     // Interactions with core
 
     public boolean register() {
@@ -138,7 +145,7 @@ public class NodeManager {
             String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
             String endpoint = generateRegistrationEndpoint();
 
-            LOG.info("Trying to register node at node management: " + endpoint);
+            LOG.info("Trying to register node controller at node management: " + endpoint);
 
             while (!connected) {
                 connected = post(endpoint, body);
@@ -151,7 +158,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully registered node controller at node management");
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -178,7 +185,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully synced node controller with node management");
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -191,7 +198,7 @@ public class NodeManager {
         try {
             String endpoint = generateVersionEndpoint();
 
-            LOG.info("Trying to fetch StreamPipes version from core: " + endpoint);
+            LOG.info("Trying to retrieve StreamPipes version from backend: " + endpoint);
 
             while (!connected) {
                 Response response = get(endpoint);
@@ -210,7 +217,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully retrieved StreamPipes version from backend");
         } catch (IOException e) {
             e.printStackTrace();
         }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 9b2e3a0..f1fdb63 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public enum RunningContainerInstances implements RunningInstances<DockerContainer> {
@@ -50,6 +51,11 @@ public enum RunningContainerInstances implements RunningInstances<DockerContaine
     }
 
     @Override
+    public List<DockerContainer> getAll() {
+        return mapDB.retrieveAll();
+    }
+
+    @Override
     public void remove(String id) {
         mapDB.delete(id);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index e15fd5c..04cf003 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -82,6 +82,7 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     public void register(InvocableRegistration registration) {
         registerAtConsul(registration);
         updateAndSyncNodeInfoDescription(registration);
+        invokePipelineElementsOnSystemRebootOrRestart();
         LOG.info("Successfully registered pipeline element container");
     }
 
@@ -149,6 +150,24 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         return response;
     }
 
+    public void invokePipelineElementsOnSystemRebootOrRestart() {
+        LOG.info("Checking for pipeline elements to be started ...");
+        getAllInvocables()
+                .forEach(graph -> {
+                    Response status = InvocableElementManager.getInstance().invoke(graph);
+                    if (status.isSuccess()) {
+                        if (status.getOptionalMessage().isEmpty()) {
+                            LOG.info("Pipeline element successfully restarted {}", status.getElementId());
+                        } else {
+                            LOG.info("Pipeline element already running {}", status.getElementId());
+                        }
+                    } else {
+                        LOG.error("Pipeline element could not be restarted - are the pipeline element containers " +
+                                "running? {}", status.getElementId());
+                    }
+                });
+    }
+
     private EventProducer getReconfigurationEventProducerFromInvocable(InvocableStreamPipesEntity graph) {
         TransportProtocol tp = getReconfigurationTransportProtocol(graph);
         EventProducer pub;
@@ -189,6 +208,10 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         }
     }
 
+    private List<InvocableStreamPipesEntity> getAllInvocables() {
+        return RunningInvocableInstances.INSTANCE.getAll();
+    }
+
     private NodeInfoDescription getNodeInfoDescription() {
         return NodeManager.getInstance().retrieveNodeInfoDescription();
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index bfa68a1..6c7d923 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
 import org.apache.streampipes.node.controller.container.management.RunningInstances;
 
 import java.io.File;
+import java.util.List;
 
 public enum RunningInvocableInstances implements RunningInstances<InvocableStreamPipesEntity> {
     INSTANCE;
@@ -48,7 +49,13 @@ public enum RunningInvocableInstances implements RunningInstances<InvocableStrea
     }
 
     @Override
+    public List<InvocableStreamPipesEntity> getAll() {
+        return mapDB.retrieveAll();
+    }
+
+    @Override
     public void remove(String id) {
         mapDB.delete(id);
     }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 085ed82..63baf7b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -44,6 +44,11 @@ public enum RunningRelayInstances implements RunningInstances<Map<String,EventRe
     }
 
     @Override
+    public List<Map<String, EventRelay>> getAll() {
+        return new ArrayList<>(runningInstances.values());
+    }
+
+    @Override
     public void remove(String id) {
         runningInstances.remove(id);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
index c588487..0937366 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
@@ -24,6 +24,8 @@ public interface CRUDStorage {
 
     <T>T retrieve(String id);
 
+    <T> List<T> retrieveAll();
+
     <T> void update(String id, T value);
 
     void delete(String id);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
index 8483fbd..f07302c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
@@ -22,7 +22,9 @@ import org.mapdb.DBMaker;
 import org.mapdb.Serializer;
 
 import java.io.File;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class MapDBImpl implements CRUDStorage {
 
@@ -58,6 +60,11 @@ public class MapDBImpl implements CRUDStorage {
     }
 
     @Override
+    public <T> List<T> retrieveAll() {
+        return map.values().stream().map(v -> (T) v).collect(Collectors.toList());
+    }
+
+    @Override
     public <T> void update(String id, T value) {
         map.put(id, value);
     }


[incubator-streampipes] 01/02: [STREAMPIPES-174] add new StreamPipesExternalDataProcessor

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 0ea252b245983cc257a8906fa06e1de8c7f066db
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Mar 16 00:45:11 2021 +0100

    [STREAMPIPES-174] add new StreamPipesExternalDataProcessor
---
 .../StreamPipesExternalDataProcessor.java          | 145 +++++++++++++++++++++
 1 file changed, 145 insertions(+)

diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
new file mode 100644
index 0000000..b3c73c2
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone;
+
+import com.google.gson.JsonObject;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
+        implements ExternalEventProcessor<ProcessorParams> {
+
+    // endpoint of Python processor runs here
+    private static final String PYTHON_ENDPOINT = "localhost:5000";
+
+    private String invocationId;
+    private String appId;
+    private String inputTopic;
+    private String outputTopic;
+    private String kafkaUrl;
+
+    @Override
+    public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+                                                                          ProcessingElementParameterExtractor extractor) {
+        EventProcessorBindingParams params = new ProcessorParams(graph);
+        invocationId = UUID.randomUUID().toString();
+        appId = graph.getAppId();
+        inputTopic = getInputTopic(params);
+        outputTopic = getOutputTopic(params);
+        kafkaUrl = getKafkaUrl(params);
+
+        Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
+        return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
+    }
+
+    protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
+        JsonObject json = new JsonObject();
+
+        json.addProperty("invocation_id", invocationId);
+        json.addProperty("processor_id", appId);
+        json.addProperty("input_topics", inputTopic);
+        json.addProperty("output_topics", outputTopic);
+        json.addProperty("bootstrap_servers", kafkaUrl);
+
+        JsonObject staticProperties = new JsonObject();
+        staticPropertyMap.forEach(staticProperties::addProperty);
+        json.add("static_properties", staticProperties);
+
+        return json;
+    }
+
+    protected void invoke(JsonObject json) {
+        post("invoke", json.toString());
+    }
+
+    protected void detach () {
+        JsonObject json = new JsonObject();
+        json.addProperty("invocation_id", invocationId);
+        post("detach", json.toString());
+    }
+
+    private static String post(String endpoint, String payload) {
+        String responseString = null;
+
+        try {
+            responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
+                    .bodyString(payload, ContentType.APPLICATION_JSON)
+                    .connectTimeout(1000)
+                    .socketTimeout(100000)
+                    .execute().returnContent().asString();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return responseString;
+    }
+
+    private String getInputTopic(EventProcessorBindingParams parameters) {
+        return parameters
+                .getGraph()
+                .getInputStreams()
+                .get(0)
+                .getEventGrounding()
+                .getTransportProtocol()
+                .getTopicDefinition()
+                .getActualTopicName();
+    }
+
+    private String getOutputTopic(EventProcessorBindingParams parameters) {
+        return parameters
+                .getGraph()
+                .getOutputStream()
+                .getEventGrounding()
+                .getTransportProtocol()
+                .getTopicDefinition()
+                .getActualTopicName();
+    }
+
+    private String getKafkaUrl(EventProcessorBindingParams parameters) {
+        String brokerHostname = parameters
+                .getGraph()
+                .getOutputStream()
+                .getEventGrounding()
+                .getTransportProtocols()
+                .get(0)
+                .getBrokerHostname();
+
+        Integer kafkaPort = ((KafkaTransportProtocol) parameters
+                .getGraph()
+                .getOutputStream()
+                .getEventGrounding()
+                .getTransportProtocols()
+                .get(0))
+                .getKafkaPort();
+
+        return brokerHostname + ":" + kafkaPort.toString();
+    }
+
+}
+
+