You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/09/17 20:30:19 UTC
[incubator-streampipes] branch edge-extensions updated: preliminary
modifaction of endpoint
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 1cfe528 preliminary modifaction of endpoint
1cfe528 is described below
commit 1cfe528bb9373b9af7a884a99b97b8499eb54ea4
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jul 31 12:55:23 2020 +0200
preliminary modifaction of endpoint
---
.../container/api/NodeControllerResource.java | 22 +++++-
.../management/pe/PipelineElementManager.java | 2 +-
.../http/InvocableEntityUrlGenerator.java | 83 +++++++++-------------
.../runtime/PipelineElementRuntimeInfoFetcher.java | 32 ++++-----
4 files changed, 72 insertions(+), 67 deletions(-)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
index 4f9947f..fb77e4b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
@@ -17,7 +17,10 @@ package org.apache.streampipes.node.controller.container.api;/*
*/
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.declarer.InvocableDeclarer;
+import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.container.transform.Transformer;
+import org.apache.streampipes.container.util.Util;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -27,6 +30,8 @@ import org.apache.streampipes.node.controller.container.management.pe.PipelineEl
import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
import org.eclipse.paho.client.mqttv3.MqttException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@@ -35,6 +40,9 @@ import java.io.IOException;
@Path("/node/container")
public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NodeControllerResource.class.getCanonicalName());
+
private static final String COLON = ":";
@@ -97,8 +105,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
// eventRelayManager.start();
// RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
- pipelineElementEndpoint = graph.getBelongsTo();
- PipelineElementManager.getInstance().invokePipelineElement(pipelineElementEndpoint, payload);
+ PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
}
else if (identifier.equals("sec")) {
graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
@@ -115,6 +122,17 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
.build();
}
+ // TODO move endpoint to /elementId/instances/runningInstanceId
+ @DELETE
+ @Path("{elementId}/{runningInstanceId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public String detach(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+
+ LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
+
+ return Util.toResponseString(elementId, false, "Could not find the running instance with id: " + runningInstanceId);
+ }
+
@POST
@Path("/detach")
@Consumes(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
index 5c41916..2d529ed 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
@@ -56,7 +56,7 @@ public class PipelineElementManager {
* invokes pipeline elements when pipeline is started
*/
public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
- LOG.info("Invoking element: {}" + pipelineElementEndpoint);
+ LOG.info("Invoking element: {}", pipelineElementEndpoint);
try {
Response httpResp = Request
.Post(pipelineElementEndpoint)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 2e6cff8..2999aca 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -28,15 +28,15 @@ import java.util.Optional;
public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
protected static final String SINK_IDENTIFIER = "sec";
protected static final String PROCESSOR_IDENTIFIER = "sepa";
-
private static final String DEFAULT_NODE_ID = "default";
- private static final String COLON = ":";
private static final String PE_PORT_KEY = "SP_PORT";
private static final String PE_HOST_KEY = "SP_HOST";
- private static final String NODE_CONTROLLER_ROUTE = "node/container/invoke";
+ private static final String NODE_CONTROLLER_ROUTE = "node/container";
public InvocableEntityUrlGenerator(InvocableStreamPipesEntity pipelineElement) {
super(pipelineElement);
@@ -44,26 +44,32 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
@Override
public String generateStartPipelineEndpointUrl() {
-// // TODO: normal setup
-// return URLPREFIX
-// + getHost()
-// + SLASH
-// + getIdentifier()
-// + SLASH
-// + pipelineElement.getAppId();
- // TODO: uncomment for edge tests
- return URLPREFIX
- + getHost()
- + SLASH
- + NODE_CONTROLLER_ROUTE
- + SLASH
- + getIdentifier()
- + SLASH
- + pipelineElement.getAppId();
+ if (pipelineElement.getDeploymentTargetNodeId() == null ||
+ pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
+ // default deployments to primary pipeline element
+ return URLPREFIX
+ + getHost()
+ + SLASH
+ + getIdentifier()
+ + SLASH
+ + pipelineElement.getAppId();
+ } else {
+ // edge deployments to secondary pipeline element
+ return URLPREFIX
+ + getHost()
+ + SLASH
+ + NODE_CONTROLLER_ROUTE
+ + "invoke"
+ + SLASH
+ + getIdentifier()
+ + SLASH
+ + pipelineElement.getAppId();
+ }
}
@Override
public String generateStopPipelineEndpointUrl() {
+ // TODO: handle stop requests
return generateStartPipelineEndpointUrl()
+ SLASH
+ pipelineElement.getDeploymentRunningInstanceId();
@@ -87,10 +93,16 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
+ pipelineElement.getDeploymentTargetNodeId()
+ SLASH;
- // Needed because secondary PE uses information from primary
- pipelineElement.setElementEndpointPort(ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY));
- pipelineElement.setElementEndpointHostname(ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY));
- // TODO: adapt elementId + belongsTo
+ String host = ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY);
+ int port = ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY);
+
+ // Necessary because secondary pipeline element description is not stored in backend
+ // It uses information from primary pipeline element. Node controller will locally forward
+ // request accordingly, thus fields must be correct.
+ pipelineElement.setElementEndpointHostname(host);
+ pipelineElement.setElementEndpointPort(port);
+ pipelineElement.setBelongsTo(URLPREFIX + host + COLON + port + SLASH + getIdentifier() + SLASH + pipelineElement.getAppId());
+ pipelineElement.setElementId(pipelineElement.getBelongsTo() + SLASH + pipelineElement.getDeploymentRunningInstanceId());
return pipelineElement.getDeploymentTargetNodeHostname()
+ COLON
@@ -100,31 +112,6 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
return defaultHost();
}
}
-// else {
-// Optional<NodeInfo> nodeInfoOpt = getNodeInfo();
-// if (nodeInfoOpt.isPresent()) {
-// NodeInfo nodeInfo = nodeInfoOpt.get();
-// // TODO: get port from Consul
-// String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
-// + pipelineElement.getElementEndpointServiceName()
-// + SLASH
-// + ConsulSpConfig.BASE_PREFIX
-// + SLASH
-// + ConsulSpConfig.SECONDARY_NODE_KEY
-// + SLASH
-// + nodeInfo.getNodeControllerId()
-// + SLASH
-// + PE_PORT_KEY;
-//
-// return nodeInfo.getNodeMetadata().getNodeAddress()
-// + COLON
-// + nodeInfo.getNodeControllerPort();
-// //+ ConsulUtil.getPortForService(route);
-// }
-// else {
-// return defaultHost();
-// }
-// }
}
private String defaultHost() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index d67c3f5..48ba076 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -35,11 +35,11 @@ import java.util.HashMap;
import java.util.Map;
public enum PipelineElementRuntimeInfoFetcher {
-
INSTANCE;
Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
+ private int FETCH_INTERVAL_MS = 300;
private Map<String, SpDataFormatConverter> converterMap;
PipelineElementRuntimeInfoFetcher() {
@@ -58,6 +58,18 @@ public enum PipelineElementRuntimeInfoFetcher {
}
}
+ private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+ return spDataStream.getEventGrounding().getTransportFormats().get(0);
+ }
+
+ private String getOutputTopic(SpDataStream spDataStream) {
+ return spDataStream
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
final String[] result = {null};
final String topic = getOutputTopic(spDataStream);
@@ -80,7 +92,7 @@ public enum PipelineElementRuntimeInfoFetcher {
while (result[0] == null) {
try {
- Thread.sleep(300);
+ Thread.sleep(FETCH_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -89,18 +101,6 @@ public enum PipelineElementRuntimeInfoFetcher {
return result[0];
}
- private TransportFormat getTransportFormat(SpDataStream spDataStream) {
- return spDataStream.getEventGrounding().getTransportFormats().get(0);
- }
-
- private String getOutputTopic(SpDataStream spDataStream) {
- return spDataStream
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
private String getLatestEventFromMqtt(SpDataStream spDataStream) throws SpRuntimeException {
final String[] result = {null};
String mqttTopic = getOutputTopic(spDataStream);
@@ -124,7 +124,7 @@ public enum PipelineElementRuntimeInfoFetcher {
while (result[0] == null) {
try {
- Thread.sleep(300);
+ Thread.sleep(FETCH_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -166,7 +166,7 @@ public enum PipelineElementRuntimeInfoFetcher {
long timeout = 0;
while (result[0] == null && timeout < 6000) {
try {
- Thread.sleep(300);
+ Thread.sleep(FETCH_INTERVAL_MS);
timeout = timeout + 300;
} catch (InterruptedException e) {
e.printStackTrace();