You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/09/17 20:30:19 UTC

[incubator-streampipes] branch edge-extensions updated: preliminary modifaction of endpoint

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 1cfe528  preliminary modifaction of endpoint
1cfe528 is described below

commit 1cfe528bb9373b9af7a884a99b97b8499eb54ea4
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jul 31 12:55:23 2020 +0200

    preliminary modifaction of endpoint
---
 .../container/api/NodeControllerResource.java      | 22 +++++-
 .../management/pe/PipelineElementManager.java      |  2 +-
 .../http/InvocableEntityUrlGenerator.java          | 83 +++++++++-------------
 .../runtime/PipelineElementRuntimeInfoFetcher.java | 32 ++++-----
 4 files changed, 72 insertions(+), 67 deletions(-)

diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
index 4f9947f..fb77e4b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
@@ -17,7 +17,10 @@ package org.apache.streampipes.node.controller.container.api;/*
  */
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.declarer.InvocableDeclarer;
+import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.transform.Transformer;
+import org.apache.streampipes.container.util.Util;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -27,6 +30,8 @@ import org.apache.streampipes.node.controller.container.management.pe.PipelineEl
 import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
 import org.eclipse.paho.client.mqttv3.MqttException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -35,6 +40,9 @@ import java.io.IOException;
 
 @Path("/node/container")
 public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(NodeControllerResource.class.getCanonicalName());
+
 
     private static final String COLON = ":";
 
@@ -97,8 +105,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
 //                eventRelayManager.start();
 //                RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
 
-                pipelineElementEndpoint = graph.getBelongsTo();
-                PipelineElementManager.getInstance().invokePipelineElement(pipelineElementEndpoint, payload);
+                PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
             }
             else if (identifier.equals("sec")) {
                 graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
@@ -115,6 +122,17 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
                 .build();
     }
 
+    // TODO move endpoint to /elementId/instances/runningInstanceId
+    @DELETE
+    @Path("{elementId}/{runningInstanceId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public String detach(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+
+        LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
+
+        return Util.toResponseString(elementId, false, "Could not find the running instance with id: " + runningInstanceId);
+    }
+
     @POST
     @Path("/detach")
     @Consumes(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
index 5c41916..2d529ed 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
@@ -56,7 +56,7 @@ public class PipelineElementManager {
      * invokes pipeline elements when pipeline is started
      */
     public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
-        LOG.info("Invoking element: {}" + pipelineElementEndpoint);
+        LOG.info("Invoking element: {}", pipelineElementEndpoint);
         try {
             Response httpResp = Request
                     .Post(pipelineElementEndpoint)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 2e6cff8..2999aca 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -28,15 +28,15 @@ import java.util.Optional;
 
 public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
 
+    private static final String COLON = ":";
+    private static final String SLASH = "/";
     protected static final String SINK_IDENTIFIER = "sec";
     protected static final String PROCESSOR_IDENTIFIER = "sepa";
-
     private static final String DEFAULT_NODE_ID = "default";
-    private static final String COLON = ":";
     private static final String PE_PORT_KEY = "SP_PORT";
     private static final String PE_HOST_KEY = "SP_HOST";
 
-    private static final String NODE_CONTROLLER_ROUTE = "node/container/invoke";
+    private static final String NODE_CONTROLLER_ROUTE = "node/container";
 
     public InvocableEntityUrlGenerator(InvocableStreamPipesEntity pipelineElement) {
         super(pipelineElement);
@@ -44,26 +44,32 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
 
     @Override
     public String generateStartPipelineEndpointUrl() {
-//        // TODO: normal setup
-//        return URLPREFIX
-//                + getHost()
-//                + SLASH
-//                + getIdentifier()
-//                + SLASH
-//                + pipelineElement.getAppId();
-        // TODO: uncomment for edge tests
-        return URLPREFIX
-                + getHost()
-                + SLASH
-                + NODE_CONTROLLER_ROUTE
-                + SLASH
-                + getIdentifier()
-                + SLASH
-                + pipelineElement.getAppId();
+        if (pipelineElement.getDeploymentTargetNodeId() == null ||
+                pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
+            // default deployments to primary pipeline element
+            return URLPREFIX
+                    + getHost()
+                    + SLASH
+                    + getIdentifier()
+                    + SLASH
+                    + pipelineElement.getAppId();
+        } else {
+            // edge deployments to secondary pipeline element
+            return URLPREFIX
+                    + getHost()
+                    + SLASH
+                    + NODE_CONTROLLER_ROUTE
+                    + "invoke"
+                    + SLASH
+                    + getIdentifier()
+                    + SLASH
+                    + pipelineElement.getAppId();
+        }
     }
 
     @Override
     public String generateStopPipelineEndpointUrl() {
+        // TODO: handle stop requests
         return generateStartPipelineEndpointUrl()
                 + SLASH
                 + pipelineElement.getDeploymentRunningInstanceId();
@@ -87,10 +93,16 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
                         + pipelineElement.getDeploymentTargetNodeId()
                         + SLASH;
 
-                // Needed because secondary PE uses information from primary
-                pipelineElement.setElementEndpointPort(ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY));
-                pipelineElement.setElementEndpointHostname(ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY));
-                // TODO: adapt elementId + belongsTo
+                String host = ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY);
+                int port = ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY);
+
+                // Necessary because secondary pipeline element description is not stored in backend
+                // It uses information from primary pipeline element. Node controller will locally forward
+                // request accordingly, thus fields must be correct.
+                pipelineElement.setElementEndpointHostname(host);
+                pipelineElement.setElementEndpointPort(port);
+                pipelineElement.setBelongsTo(URLPREFIX + host + COLON + port + SLASH + getIdentifier() + SLASH + pipelineElement.getAppId());
+                pipelineElement.setElementId(pipelineElement.getBelongsTo() + SLASH + pipelineElement.getDeploymentRunningInstanceId());
 
                 return pipelineElement.getDeploymentTargetNodeHostname()
                         + COLON
@@ -100,31 +112,6 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
                 return defaultHost();
             }
         }
-//        else {
-//            Optional<NodeInfo> nodeInfoOpt = getNodeInfo();
-//            if (nodeInfoOpt.isPresent()) {
-//                NodeInfo nodeInfo = nodeInfoOpt.get();
-//                // TODO: get port from Consul
-//                String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
-//                        + pipelineElement.getElementEndpointServiceName()
-//                        + SLASH
-//                        + ConsulSpConfig.BASE_PREFIX
-//                        + SLASH
-//                        + ConsulSpConfig.SECONDARY_NODE_KEY
-//                        + SLASH
-//                        + nodeInfo.getNodeControllerId()
-//                        + SLASH
-//                        + PE_PORT_KEY;
-//
-//                return nodeInfo.getNodeMetadata().getNodeAddress()
-//                        + COLON
-//                        + nodeInfo.getNodeControllerPort();
-//                        //+ ConsulUtil.getPortForService(route);
-//            }
-//            else {
-//                return defaultHost();
-//            }
-//        }
     }
 
     private String defaultHost() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index d67c3f5..48ba076 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -35,11 +35,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 public enum PipelineElementRuntimeInfoFetcher {
-
   INSTANCE;
 
   Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
 
+  private int FETCH_INTERVAL_MS = 300;
   private Map<String, SpDataFormatConverter> converterMap;
 
   PipelineElementRuntimeInfoFetcher() {
@@ -58,6 +58,18 @@ public enum PipelineElementRuntimeInfoFetcher {
     }
   }
 
+  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+    return spDataStream.getEventGrounding().getTransportFormats().get(0);
+  }
+
+  private String getOutputTopic(SpDataStream spDataStream) {
+    return spDataStream
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
+  }
+
   private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
     final String[] result = {null};
     final String topic = getOutputTopic(spDataStream);
@@ -80,7 +92,7 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     while (result[0] == null) {
       try {
-        Thread.sleep(300);
+        Thread.sleep(FETCH_INTERVAL_MS);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
@@ -89,18 +101,6 @@ public enum PipelineElementRuntimeInfoFetcher {
     return result[0];
   }
 
-  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
-    return spDataStream.getEventGrounding().getTransportFormats().get(0);
-  }
-
-  private String getOutputTopic(SpDataStream spDataStream) {
-    return spDataStream
-            .getEventGrounding()
-            .getTransportProtocol()
-            .getTopicDefinition()
-            .getActualTopicName();
-  }
-
   private String getLatestEventFromMqtt(SpDataStream spDataStream) throws SpRuntimeException {
     final String[] result = {null};
     String mqttTopic = getOutputTopic(spDataStream);
@@ -124,7 +124,7 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     while (result[0] == null) {
       try {
-        Thread.sleep(300);
+        Thread.sleep(FETCH_INTERVAL_MS);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
@@ -166,7 +166,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     long timeout = 0;
     while (result[0] == null && timeout < 6000) {
       try {
-        Thread.sleep(300);
+        Thread.sleep(FETCH_INTERVAL_MS);
         timeout = timeout + 300;
       } catch (InterruptedException e) {
         e.printStackTrace();