You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/25 23:06:34 UTC
[incubator-streampipes] branch edge-extensions updated: [WIP]
working edge deployments using node controller,
some refactoring on existing classes
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new ec16d76 [WIP] working edge deployments using node controller, some refactoring on existing classes
ec16d76 is described below
commit ec16d7653c18c97b0281d6115a7efecec5baec7a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Nov 26 00:01:13 2020 +0100
[WIP] working edge deployments using node controller, some refactoring on existing classes
---
.../streampipes/config/consul/ConsulSpConfig.java | 81 +++-
.../development/env | 0
.../master/init/AdapterMasterContainer.java | 0
.../standalone/init/StandaloneModelSubmitter.java | 2 +-
.../streampipes/container/util/ConsulUtil.java | 426 ++++++++++++---------
.../container/util/NodeControllerUtil.java | 27 +-
.../model/base/InvocableStreamPipesEntity.java | 19 -
.../model/graph/DataProcessorInvocation.java | 3 +
.../model/graph/DataSinkInvocation.java | 3 +
.../container/NodeControllerContainerInit.java | 10 +-
.../container/config/NodeControllerConfig.java | 4 +-
.../container/management/IRunningInstances.java | 3 +-
.../container/management/info/NodeInfoStorage.java | 5 +-
.../{node => janitor}/NodeJanitorManager.java | 3 +-
.../orchestrator/RunningContainerInstances.java | 3 +-
.../docker/DockerContainerOrchestrator.java | 5 +-
.../orchestrator/docker/DockerNodeContainer.java | 7 +-
.../orchestrator/docker/DockerUtils.java | 3 +-
.../ElementLifeCyle.java} | 16 +-
...ntManager.java => InvocableElementManager.java} | 85 ++--
.../RunningInvocableInstances.java} | 21 +-
.../management/relay/AbstractMqttKafkaBridge.java | 3 +-
.../relay/AbstractMqttKafkaConnector.java | 3 +-
.../management/relay/AbstractMqttKafkaRelay.java | 3 +-
.../management/relay/EventRelayManager.java | 3 +-
.../management/relay/MqttKafkaBridge.java | 4 +-
.../management/relay/RunningRelayInstances.java | 3 +-
.../container/management/relay/model/Metrics.java | 3 +-
.../container/management/relay/model/Relay.java | 3 +-
.../{resource => resources}/ResourceManager.java | 3 +-
.../container/rest/DebugRelayResource.java | 8 +-
.../container/rest/InfoStatusResource.java | 7 +-
...tResource.java => InvocableEntityResource.java} | 68 ++--
.../rest/NodeControllerResourceConfig.java | 4 +-
.../manager/endpoint/EndpointFetcher.java | 2 +-
.../execution/http/DataSetEntityUrlGenerator.java | 10 +-
.../execution/http/EndpointUrlGenerator.java | 9 +-
.../http/InvocableEntityUrlGenerator.java | 143 +++----
.../manager/matching/InvocationGraphBuilder.java | 9 +-
.../manager/node/AvailableNodesFetcher.java | 2 +-
.../apache/streampipes/rest/impl/ConsulConfig.java | 2 +-
.../app/core-model/gen/streampipes-model-client.ts | 2 +-
ui/src/app/core-model/gen/streampipes-model.ts | 33 +-
.../save-pipeline/save-pipeline.component.ts | 2 -
44 files changed, 581 insertions(+), 474 deletions(-)
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
index 80226c1..e76a36d 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
@@ -26,8 +26,12 @@ import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.config.SpConfigChangeCallback;
import org.apache.streampipes.config.model.ConfigItem;
import org.apache.streampipes.config.model.ConfigurationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
+import java.net.Socket;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
@@ -36,9 +40,11 @@ import java.util.Map;
import java.util.Optional;
public class ConsulSpConfig extends SpConfig implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class.getCanonicalName());
private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
- private static final String NODE_ID_ENV_KEY = "SP_NODE_ID";
+ private static final int CONSUL_DEFAULT_PORT = 8500;
+ private static final String ENV_NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
public static final String BASE_PREFIX = "base";
@@ -48,7 +54,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
public static final String SECONDARY_NODE_KEY = "secondary";
private String serviceName;
- private KeyValueClient kvClient;
+ private KeyValueClient kvClient;
private List<String> baseConfigKeys = Arrays.asList("SP_HOST", "SP_PORT");
@@ -59,32 +65,69 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
public ConsulSpConfig(String serviceName) {
super(serviceName);
- //TDOO use consul adress from an environment variable
+ Consul consul = consulInstance();
+ this.kvClient = consul.keyValueClient();
+ this.serviceName = serviceName;
+ }
+
+ public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
+ this(serviceName);
+ this.callback = callback;
+ this.configProps = new HashMap<>();
+ new Thread(this).start();
+ }
+
+ private static Consul consulInstance() {
+ boolean connected = false;
+ URL consulUrl = consulURL();
+
+ while (!connected) {
+ LOG.info("Trying to connect to Consul to register config items");
+ connected = isReady(consulUrl.getHost(), consulUrl.getPort());
+
+ if (!connected) {
+ LOG.info("Retrying in 1 second");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("Successfully connected to Consul");
+ return Consul.builder().withUrl(consulURL()).build();
+ }
+
+ private static URL consulURL() {
Map<String, String> env = System.getenv();
- Consul consul;
+ URL url = null;
+
if (env.containsKey(CONSUL_ENV_LOCATION)) {
- URL url = null;
try {
- url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+ url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
- consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
} else {
- consul = Consul.builder().build();
+ try {
+ url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
}
-
-// Consul consul = Consul.builder().build(); // connect to Consul on localhost
- kvClient = consul.keyValueClient();
-
- this.serviceName = serviceName;
+ return url;
}
- public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
- this(serviceName);
- this.callback = callback;
- this.configProps = new HashMap<>();
- new Thread(this).start();
+ public static boolean isReady(String host, int port) {
+ try {
+ InetSocketAddress sa = new InetSocketAddress(host, port);
+ Socket ss = new Socket();
+ ss.connect(sa, 1000);
+ ss.close();
+ } catch(Exception e) {
+ return false;
+ }
+ return true;
}
@Override
@@ -249,7 +292,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
private String addSn(String key) {
String configAppendix;
if (this.baseConfigKeys.contains(key)) {
- String nodeId = System.getenv(NODE_ID_ENV_KEY);
+ String nodeId = System.getenv(ENV_NODE_CONTROLLER_ID_KEY);
if (nodeId == null) {
//nodeId = PRIMARY_NODE_KEY;
configAppendix = BASE_PREFIX + SLASH + PRIMARY_NODE_KEY;
diff --git a/streampipes-connect-container-master/development/env b/streampipes-connect-container-master/development/env
deleted file mode 100644
index e69de29..0000000
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/init/AdapterMasterContainer.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/init/AdapterMasterContainer.java
deleted file mode 100644
index e69de29..0000000
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 0d28466..9de5af6 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -58,7 +58,7 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter {
app.run();
// check wether pipeline element is managed by node controller
- if (System.getenv("SP_NODE_ID") != null) {
+ if (System.getenv("SP_NODE_CONTROLLER_ID") != null) {
// secondary
// register pipeline element service via node controller
NodeControllerUtil.register(
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index d90df3d..a2f1b45 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -18,8 +18,7 @@
package org.apache.streampipes.container.util;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.orbitz.consul.AgentClient;
import com.orbitz.consul.Consul;
import com.orbitz.consul.HealthClient;
@@ -31,11 +30,12 @@ import com.orbitz.consul.model.health.ServiceHealth;
import com.orbitz.consul.model.kv.Value;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
-import org.apache.streampipes.config.consul.ConsulSpConfig;
import org.apache.streampipes.config.model.ConfigItem;
+import org.apache.streampipes.container.model.PeConfig;
import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,104 +49,201 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class ConsulUtil {
- private static final String PROTOCOL = "http://";
+ private static final Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
private static final String HEALTH_CHECK_INTERVAL = "10s";
- //private static final String HEALTH_CHECK_TTL = "15s";
- //private static final String CONSUL_DEREGISTER_SERIVER_AFTER = "10s";
- private static final String PE_SERVICE_NAME = "pe";
- private static final String NODE_SERVICE_NAME = "node";
-
+ private static final String PE_SVC_TAG = "pe";
+ private static final String NODE_SVC_TAG = "node";
private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+ private static final int CONSUL_DEFAULT_PORT = 8500;
+ private static final String CONSUL_NAMESPACE = "/sp/v1/";
private static final String CONSUL_URL_REGISTER_SERVICE = "v1/agent/service/register";
-
- private static final String PRIMARY_PE_IDENTIFIER = "primary";
- private static final String SECONDARY_PE_IDENTIFIER = "secondary";
- private static final String NODE_ID_IDENTIFIER = "SP_NODE_ID";
- private static final String SLASH = "/";
-
- static Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
+ private static final String PRIMARY_PE_IDENTIFIER_TAG = "primary";
+ private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
+ private static final String ENV_NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
public static Consul consulInstance() {
return Consul.builder().withUrl(consulURL()).build();
}
- public static void registerPeService(String serviceID, String url, int port) {
- String serviceLocationTag = System.getenv(NODE_ID_IDENTIFIER) == null ? PRIMARY_PE_IDENTIFIER : SECONDARY_PE_IDENTIFIER;
- String uniquePEServiceId = url + SLASH + serviceID;
- registerService(PE_SERVICE_NAME, uniquePEServiceId, url, port, Arrays.asList("pe", serviceLocationTag));
+ /**
+ * Method called by {@link org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter} to register
+ * new pipeline element service endpoint.
+ *
+ * @param svcId unique service id
+ * @param host host address of pipeline element service endpoint
+ * @param port port of pipeline element service endpoint
+ */
+ public static void registerPeService(String svcId, String host, int port) {
+ registerService(PE_SVC_TAG, makeUniqueSvcId(host, svcId), host, port, makeSvcTags());
}
- public static void registerService(String serviceName, String serviceID, String url, int port, String tag) {
- registerService(serviceName, serviceID, url, port, Collections.singletonList(tag));
+ /**
+ * Method called by {@link org.apache.streampipes.node.controller.container.NodeControllerContainerInit} to
+ * register new node controller service endpoint.
+ *
+ * @param svcId unique service id
+ * @param host host address of node controller service endpoint
+ * @param port port of node controller service endpoint
+ */
+ public static void registerNodeService(String svcId, String host, int port) {
+ registerService(NODE_SVC_TAG, makeUniqueSvcId(host, svcId), host, port, Collections.singletonList(NODE_SVC_TAG));
}
- public static void registerService(String serviceName, String serviceID, String url, int port, List<String> tag) {
- String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
- try {
- registerServiceHttpClient(body);
- LOG.info("Register service " + serviceID +" successful");
- } catch (IOException e) {
- LOG.error("Register service: " + serviceID, " - " + e.toString());
+ /**
+ * Register service at Consul.
+ *
+ * @param svcGroup service group for registered service
+ * @param svcId unique service id
+ * @param host host address of service endpoint
+ * @param port port of service endpoint
+ * @param tags tags of service
+ */
+ public static void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+ boolean connected = false;
+
+ while (!connected) {
+ LOG.info("Trying to register service at Consul: " + svcId);
+ ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+ connected = registerServiceHttpClient(svcRegistration);
+
+ if (!connected) {
+ LOG.info("Retrying in 1 second");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
+ LOG.info("Successfully registered service at Consul: " + svcId);
}
- public static void registerNodeControllerService(String serviceID, String url, int port) {
- String uniqueNodeServiceId = url + SLASH + serviceID;
- registerService(NODE_SERVICE_NAME, uniqueNodeServiceId, url, port, "node");
- }
-
- //NOT TESTED
- /* public static void subcribeHealthService() {
- Consul consul = consulInstance();
- HealthClient healthClient = consul.healthClient();
- Agent agent = consul.agentClient().getAgent();
-
+ /**
+ * PUT REST call to Consul API to register new service.
+ *
+ * @param svcRegistration service registration object used to register service endpoint
+ * @return success or failure of service registration
+ */
+ private static boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
+ try {
+ String endpoint = makeConsulEndpoint();
+ String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
- ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, PE_SERVICE_NAME);
+ Request.Put(endpoint)
+ .addHeader("accept", "application/json")
+ .body(new StringEntity(body))
+ .execute();
- svHealth.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
- @Override
- public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
- System.out.println("ad");
- }
- });
+ return true;
+ } catch (IOException e) {
+ LOG.error("Could not register service at Consul");
}
- */
+ return false;
+ }
- public static Map<String, Tuple2<String, String>> getPEServices() {
- LOG.info("Load PE service status");
+ // GET methods
+
+ /**
+ * Get all pipeline element service endpoints
+ *
+ * @return list of pipline element service endpoints
+ */
+ public static Map<String, Tuple2<String, String>> getPeServices() {
+ LOG.info("Load pipeline element service status");
Consul consul = consulInstance();
AgentClient agent = consul.agentClient();
Map<String, Service> services = consul.agentClient().getServices();
Map<String, HealthCheck> checks = agent.getChecks();
- Map<String, Tuple2<String,String>> peServices = new HashMap<>();
+ Map<String, Tuple2<String,String>> peSvcs = new HashMap<>();
for (Map.Entry<String, Service> entry : services.entrySet()) {
- if (entry.getValue().getTags().contains(PE_SERVICE_NAME)) {
- String serviceId = entry.getValue().getId();
- String serviceStatus = "critical";
- String serviceTag = "primary";
+ if (entry.getValue().getTags().contains(PE_SVC_TAG)) {
+ String svcId = entry.getValue().getId();
+
+ String svcStatus = "critical";
+ String svcTag = PRIMARY_PE_IDENTIFIER_TAG;
+
if (checks.containsKey("service:" + entry.getKey())) {
- serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
- if (checks.get("service:" + entry.getKey()).getServiceTags().stream().noneMatch(e -> e.contains("primary"))) {
- serviceTag = "secondary";
+ svcStatus = checks.get("service:" + entry.getKey()).getStatus();
+ if (checks.get("service:" + entry.getKey())
+ .getServiceTags().stream().noneMatch(e -> e.contains(PRIMARY_PE_IDENTIFIER_TAG))) {
+ svcTag = SECONDARY_PE_IDENTIFIER_TAG;
}
}
- LOG.info("Service id: " + serviceId + " service status: " + serviceStatus + " service tag: " + serviceTag);
- peServices.put(serviceId, new Tuple2(serviceStatus, serviceTag));
+ LOG.info("Service id: " + svcId + " service status: " + svcStatus + " service tag: " + svcTag);
+ peSvcs.put(svcId, new Tuple2<>(svcStatus, svcTag));
}
}
- return peServices;
+ return peSvcs;
+ }
+
+ /**
+ * Get active pipeline element service endpoints
+ *
+ * @return list of pipeline element endpoints
+ */
+ public static List<String> getActivePeEndpoints() {
+ LOG.info("Load active pipeline element service endpoints");
+ return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PRIMARY_PE_IDENTIFIER_TAG));
}
+ /**
+ * Get active node controller service endpoints
+ *
+ * @return list of active node controller endpoints
+ */
+ public static List<String> getActiveNodeEndpoints() {
+ LOG.info("Load active node service endpoints");
+ return getServiceEndpoints(NODE_SVC_TAG, true, new ArrayList<>());
+ }
+
+
+ /**
+ * Get service endpoints
+ *
+ * @param svcGroup service group for registered service
+ * @param restrictToHealthy retrieve healthy or all registered services for a service group
+ * @param filterByTags filter param to filter list of registered services
+ * @return list of services
+ */
+ public static List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+ List<String> filterByTags) {
+ Consul consul = consulInstance();
+ HealthClient healthClient = consul.healthClient();
+ List<String> endpoints = new LinkedList<>();
+ List<ServiceHealth> nodes;
+
+ if (!restrictToHealthy) {
+ nodes = healthClient.getAllServiceInstances(svcGroup).getResponse();
+ } else {
+ nodes = healthClient.getHealthyServiceInstances(svcGroup).getResponse();
+ }
+ for (ServiceHealth node : nodes) {
+ if (node.getService().getTags().containsAll(filterByTags)) {
+ String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
+ LOG.info("Active " + svcGroup + " endpoint: " + endpoint);
+ endpoints.add(endpoint);
+ }
+ }
+ return endpoints;
+ }
+
+ /**
+ * Get key-value entries for a given route
+ *
+ * @param route route to retrieve key-value entries in Consul
+ * @return key-value entries
+ */
public static Map<String, String> getKeyValue(String route) {
Consul consul = consulInstance();
KeyValueClient keyValueClient = consul.keyValueClient();
@@ -162,147 +259,80 @@ public class ConsulUtil {
if (value.getValueAsString().isPresent()) {
v = value.getValueAsString().get();
}
- LOG.info("Load key: " + route + " value: " + v);
keyValues.put(key, v);
}
}
return keyValues;
}
-
- public static int getIntValue(String route) {
- String value = getKeyValue(route)
- .values()
- .stream()
- .findFirst()
- .get();
-
-// List<String> list = ConsulUtil.getKeyValue(route)
-// .entrySet()
-// .stream()
-// .map(m -> {
-// return new Gson().fromJson(m.getValue(), ConfigItem.class).getValue();
-// })
-// .collect(Collectors.toList());
-
- return Integer.parseInt(new Gson().fromJson(value, ConfigItem.class).getValue());
+ /**
+ * Get specific value for a key in route
+ *
+ * @param route route to retrieve value
+ * @param type data type of return value, e.g. Integer.class, String.class
+ * @return value for key
+ */
+ public static <T> T getValueForRoute(String route, Class<T> type) {
+ try {
+ String entry = getKeyValue(route)
+ .values()
+ .stream()
+ .findFirst()
+ .orElse(null);
+
+ if (type.equals(Integer.class)) {
+ return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ } else if (type.equals(Boolean.class)) {
+ return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ } else {
+ return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ }
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ throw new IllegalArgumentException("Cannot get entry from Consul");
}
- public static String getStringValue(String route) {
- String value = getKeyValue(route)
- .values()
- .stream()
- .findFirst()
- .get();
-
- return new Gson().fromJson(value, ConfigItem.class).getValue();
- }
+ // Update
+ /**
+ * Update key-value config in Consul
+ *
+ * @param key key to be updated
+ * @param entry new entry
+ * @param password wether value is a password, here only non-sensitive values are updated
+ */
public static void updateConfig(String key, String entry, boolean password) {
Consul consul = consulInstance();
- KeyValueClient keyValueClient = consul.keyValueClient();
-
if (!password) {
- keyValueClient.putValue(key, entry);
+ LOG.info("Updated config - key:" + key + " value: " + entry);
+ consul.keyValueClient().putValue(key, entry);
}
-
-// keyValueClient.putValue(key + "_description", description);
-// keyValueClient.putValue(key + "_type", valueType);
- LOG.info("Updated config - key:" + key +
- " value: " + entry);
-// +
-// " description: " + description +
-// " type: " + valueType);
}
- public static List<String> getActivePEServicesEndPoints() {
- LOG.info("Load active PE service endpoints");
- return getServiceEndpoints(PE_SERVICE_NAME, true, Collections.singletonList(PRIMARY_PE_IDENTIFIER));
- }
-
- public static List<String> getActiveNodeEndpoints() {
- LOG.info("Load active node service endpoints");
- // TODO set restrictToHealthy to true, this is just for debugging
- return getServiceEndpoints(NODE_SERVICE_NAME, false, new ArrayList<>());
- }
-
- public static List<String> getServiceEndpoints(String serviceGroup, boolean restrictToHealthy,
- List<String> filterByTags) {
+ /**
+ * Deregister registered service endpoint in Consul
+ *
+ * @param svcId service id of endpoint to be deregistered
+ */
+ public static void deregisterService(String svcId) {
Consul consul = consulInstance();
- HealthClient healthClient = consul.healthClient();
- List<String> endpoints = new LinkedList<>();
-
- List<ServiceHealth> nodes;
- if (!restrictToHealthy) {
- nodes = healthClient.getAllServiceInstances(serviceGroup).getResponse();
- } else {
- nodes = healthClient.getHealthyServiceInstances(serviceGroup).getResponse();
- }
- for (ServiceHealth node : nodes) {
- if (node.getService().getTags().containsAll(filterByTags)) {
- String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
- LOG.info("Active " + serviceGroup + " endpoint: " + endpoint);
- endpoints.add(endpoint);
- }
- }
- return endpoints;
+ LOG.info("Deregister service: " + svcId);
+ consul.agentClient().deregister(svcId);
}
- public static void deregisterService(String serviceId) {
+ /**
+ * Delete config in Consul
+ *
+ * @param key key to be deleted
+ */
+ public static void deleteConfig(String key) {
Consul consul = consulInstance();
-
- LOG.info("Deregister Service: " + serviceId);
- consul.agentClient().deregister(serviceId);
+ LOG.info("Delete config: {}", key);
+ consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
}
- public static void deleteKeys(String serviceId) {
- Consul consul = consulInstance();
-
- LOG.info("Delete keys: {}", serviceId);
- // TODO: namespace should not be hardcoded
- consul.keyValueClient().deleteKeys("/sp/v1/" + serviceId);
- }
-
- private static int registerServiceHttpClient(String body) throws IOException {
- return Request.Put(consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE)
- .addHeader("accept", "application/json")
- .body(new StringEntity(body))
- .execute()
- .returnResponse()
- .getStatusLine().getStatusCode();
- }
-
- private static String createServiceRegisterBody(String name, String id, String url, int port, List<String> tags) {
- String healthCheckURL = PROTOCOL + url + ":" + port;
- ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
- body.setID(id);
- body.setName(name);
- body.setTags(tags);
- body.setAddress(PROTOCOL + url);
- body.setPort(port);
- body.setEnableTagOverride(true);
- body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
-
- return new Gson().toJson(body);
-
-// return "{" +
-// "\"ID\": \"" + id + "\"," +
-// "\"Name\": \"" + name + "\"," +
-// "\"Tags\": [" +
-// " \"" + tag + "\"" + ",\"urlprefix-/" + id + " strip=/" + id + "\"" +
-// " ]," +
-// " \"Address\": \"" + PROTOCOL + url + "\"," +
-// " \"Port\":" + port + "," +
-// " \"EnableTagOverride\": true" + "," +
-// "\"Check\": {" +
-// " \"Method\": \"GET\"" + "," +
-// " \"http\":" + "\"" + healthCheckURL + "\"," +
-// // " \"DeregisterCriticalServiceAfter\":" + "\"" + CONSUL_DEREGISTER_SERIVER_AFTER + "\"," +
-// " \"interval\":" + "\"" + HEALTH_CHECK_INTERVAL + "\"" + //"," +
-// //" \"TTL\":" + "\"" + HEALTH_CHECK_TTL + "\"" +
-// " }" +
-// "}";
- }
+ // Helper methods
private static URL consulURL() {
Map<String, String> env = System.getenv();
@@ -310,17 +340,45 @@ public class ConsulUtil {
if (env.containsKey(CONSUL_ENV_LOCATION)) {
try {
- url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+ url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
} else {
try {
- url = new URL("http", "localhost", 8500, "");
+ url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
return url;
}
+
+ private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
+ int port, List<String> tags) {
+ ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+ body.setID(id);
+ body.setName(svcGroup);
+ body.setTags(tags);
+ body.setAddress(HTTP_PROTOCOL + host);
+ body.setPort(port);
+ body.setEnableTagOverride(true);
+ body.setCheck(new HealthCheckConfiguration("GET",
+ (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+
+ return body;
+ }
+
+ private static String makeUniqueSvcId(String host, String serviceID) {
+ return host + SLASH + serviceID;
+ }
+
+ private static List<String> makeSvcTags() {
+ return Arrays.asList(PE_SVC_TAG, System.getenv(ENV_NODE_CONTROLLER_ID_KEY) == null ?
+ PRIMARY_PE_IDENTIFIER_TAG : SECONDARY_PE_IDENTIFIER_TAG);
+ }
+
+ private static String makeConsulEndpoint() {
+ return consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE;
+ }
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index fd99736..db959d2 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -33,28 +33,27 @@ import java.util.*;
public class NodeControllerUtil {
static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
- private static final String PROTOCOL = "http://";
+ private static final String HTTP_PROTOCOL = "http://";
private static final String COLON = ":";
private static final String SLASH = "/";
private static final String HEALTH_CHECK_INTERVAL = "10s";
- private static final String PE_SERVICE_NAME = "pe";
- private static final String PE_SECONDARY_TAG = "secondary";
- private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "node/container/register";
+ private static final String PE_TAG = "pe";
+ private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
+ private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/container/register";
public static void register(String serviceID, String host, int port,
Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
- register(PE_SERVICE_NAME, makeSvcId(host, serviceID), host, port,
- Arrays.asList(PE_SERVICE_NAME, PE_SECONDARY_TAG), epaDeclarers);
+ register(PE_TAG, makeSvcId(host, serviceID), host, port,
+ Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
}
- public static void register(String svcName, String svcId, String url, int port, List<String> tag,
+ public static void register(String svcName, String svcId, String host, int port, List<String> tag,
Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
-
boolean connected = false;
while (!connected) {
LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
- String body = createSvcBody(svcName, svcId, url, port, tag, epaDeclarers);
+ String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
connected = registerSvcHttpClient(body);
if (!connected) {
@@ -84,15 +83,15 @@ public class NodeControllerUtil {
return false;
}
- private static String createSvcBody(String name, String id, String url, int port, List<String> tags,
+ private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
try {
ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
- String healthCheckURL = PROTOCOL + url + COLON + port;
+ String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
body.setID(id);
body.setName(name);
body.setTags(tags);
- body.setAddress(PROTOCOL + url);
+ body.setAddress(HTTP_PROTOCOL + host);
body.setPort(port);
body.setEnableTagOverride(true);
body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
@@ -110,14 +109,14 @@ public class NodeControllerUtil {
private static String makeRegistrationEndpoint() {
if (System.getenv("SP_NODE_CONTROLLER_HOST") != null) {
- return PROTOCOL
+ return HTTP_PROTOCOL
+ System.getenv("SP_NODE_CONTROLLER_HOST")
+ COLON
+ System.getenv("SP_NODE_CONTROLLER_PORT")
+ SLASH
+ NODE_CONTROLLER_REGISTER_SVC_URL;
} else {
- return PROTOCOL
+ return HTTP_PROTOCOL
+ "localhost"
+ COLON
+ "7077"
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 3d86c5b..8d38c28 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -129,25 +129,6 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
}
}
-// TODO: delete if not needed after merge
-// public InvocableStreamPipesEntity(ConsumableStreamPipesEntity entityDescription) {
-// super();
-// this.setName(entityDescription.getName());
-// this.setDescription(entityDescription.getDescription());
-// this.setIconUrl(entityDescription.getIconUrl());
-// this.setInputStreams(entityDescription.getSpDataStreams());
-// this.setSupportedGrounding(entityDescription.getSupportedGrounding());
-// this.setStaticProperties(entityDescription.getStaticProperties());
-// this.setBelongsTo(entityDescription.getElementId());
-// this.setStreamRequirements(entityDescription.getSpDataStreams());
-// this.setAppId(entityDescription.getAppId());
-// this.setIncludesAssets(entityDescription.isIncludesAssets());
-//
-// this.setElementEndpointHostname(entityDescription.getElementEndpointHostname());
-// this.setElementEndpointPort(entityDescription.getElementEndpointPort());
-// this.setElementEndpointServiceName(entityDescription.getElementEndpointServiceName());
-// }
-
public InvocableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
super(uri, name, description, iconUrl);
this.configured = false;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index fbaafe7..e224806 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -71,6 +71,9 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
this.setAppId(sepa.getAppId());
this.setIncludesAssets(sepa.isIncludesAssets());
this.setElementId(RdfIdGenerator.makeRdfId(this));
+ this.setElementEndpointServiceName(sepa.getElementEndpointServiceName());
+ this.setElementEndpointHostname(sepa.getElementEndpointHostname());
+ this.setElementEndpointPort(sepa.getElementEndpointPort());
//this.setUri(belongsTo +"/" +getElementId());
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index a07dc21..17449f7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -62,6 +62,9 @@ public class DataSinkInvocation extends InvocableStreamPipesEntity {
this.setAppId(sec.getAppId());
this.setIncludesAssets(sec.isIncludesAssets());
this.setElementId(RdfIdGenerator.makeRdfId(this));
+ this.setElementEndpointServiceName(sec.getElementEndpointServiceName());
+ this.setElementEndpointHostname(sec.getElementEndpointHostname());
+ this.setElementEndpointPort(sec.getElementEndpointPort());
//this.setUri(belongsTo +"/" +getElementId());
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
index ece533b..4ff12ac 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,16 +15,15 @@ package org.apache.streampipes.node.controller.container;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container;
import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.model.node.PipelineElementDockerContainer;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.node.NodeJanitorManager;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.container.management.janitor.NodeJanitorManager;
+import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -67,7 +65,7 @@ public class NodeControllerContainerInit {
}
// registration with consul here
- ConsulUtil.registerNodeControllerService(
+ ConsulUtil.registerNodeService(
nodeConfig.getNodeServiceId(),
nodeConfig.getNodeHostName(),
nodeConfig.getNodeControllerPort()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 139d88d..4368d6d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.config;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.config;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.config;
import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.model.node.resources.interfaces.AccessibleSensorActuatorResource;
@@ -29,7 +30,6 @@ public enum NodeControllerConfig {
private static final String SLASH = "/";
private static final String NODE_SERVICE_ID = "node/org.apache.streampipes.node.controller";
-
private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
private static final String DEFAULT_NODE_TYPE = "edge";
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
index e049a80..afb797d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management;
public interface IRunningInstances<T> {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 8c6f628..05f5278 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.info;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.info;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.info;
import org.apache.streampipes.model.node.*;
import org.apache.streampipes.model.node.resources.hardware.HardwareResource;
@@ -46,7 +47,7 @@ public class NodeInfoStorage {
private NodeInfo nodeInfo = new NodeInfo();
- private static DockerInfo DockerInfo = DockerUtils.getInstance().getDockerInfo();
+ private static final DockerInfo DockerInfo = DockerUtils.getInstance().getDockerInfo();
// OSHI to retreive system information
private static SystemInfo si = new SystemInfo();
private static HardwareAbstractionLayer hal = si.getHardware();
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
index 74deddc..31aee7f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.node;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.node;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.janitor;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 58a88b1..e3a09a6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
import org.apache.streampipes.node.controller.container.management.IRunningInstances;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
index 943a5bd..d45afcb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
@@ -97,7 +98,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
// deregister and delete kv pair in service in consul
ConsulUtil.deregisterService(p.getServiceId());
- ConsulUtil.deleteKeys(p.getServiceId());
+ ConsulUtil.deleteConfig(p.getServiceId());
ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
"message",
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
index ed5dd38..b58ba5c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
import org.apache.streampipes.model.node.PipelineElementDockerContainerBuilder;
@@ -39,8 +40,8 @@ public enum DockerNodeContainer {
.withName("streampipes_pipeline-elements-all-jvm")
.withExposedPorts(new String[]{"7023"})
.withEnvironmentVariables(Arrays.asList(
- "SP_NODE_ID=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
- "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+ "SP_NODE_CONTROLLER_ID=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+ "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
"SP_NODE_CONTROLLER_PORT=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
))
.withLabels(new HashMap<String,String>(){{
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
index 8793685..3a0a10c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
similarity index 74%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
index e049a80..7d0a0bb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +15,18 @@ package org.apache.streampipes.node.controller.container.management;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.pe;
-public interface IRunningInstances<T> {
+import org.apache.streampipes.container.model.node.InvocableRegistration;
- void add(String id, T value);
+public interface ElementLifeCyle {
- boolean isRunning(String id);
+ void register(InvocableRegistration registration);
- T get(String id);
+ String invoke(String endpoint, String payload);
+
+ String detach(String runningInstanceId);
+
+ void unregister();
- void remove(String id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
similarity index 59%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index cc648f4..0cb079e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.pe;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,7 +15,9 @@ package org.apache.streampipes.node.controller.container.management.pe;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.pe;
+import com.google.gson.Gson;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
@@ -29,84 +31,93 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
-public class PipelineElementManager {
+public class InvocableElementManager implements ElementLifeCyle {
private static final Logger LOG =
- LoggerFactory.getLogger(PipelineElementManager.class.getCanonicalName());
+ LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
- private static final String PROTOCOL = "http://";
+ private static final String HTTP_PROTOCOL = "http://";
private static final String COLON = ":";
private static final String SLASH = "/";
private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
private static final Integer CONNECT_TIMEOUT = 10000;
- private static PipelineElementManager instance = null;
+ private static InvocableElementManager instance = null;
- private PipelineElementManager() {}
+ private InvocableElementManager() {}
- public static PipelineElementManager getInstance() {
+ public static InvocableElementManager getInstance() {
if (instance == null) {
- synchronized (PipelineElementManager.class) {
+ synchronized (InvocableElementManager.class) {
if (instance == null)
- instance = new PipelineElementManager();
+ instance = new InvocableElementManager();
}
}
return instance;
}
- /**
- * Register pipeline element container
- *
- * @param invocableRegistration
- */
- public void registerPipelineElements(InvocableRegistration invocableRegistration) {
+ @Override
+ public void register(InvocableRegistration registration) {
try {
Request.Put(makeConsulRegistrationEndpoint())
.addHeader("accept", "application/json")
.body(new StringEntity(JacksonSerializer
.getObjectMapper()
- .writeValueAsString(invocableRegistration.getConsulServiceRegistrationBody())))
+ .writeValueAsString(registration.getConsulServiceRegistrationBody())))
.execute();
// TODO: persistent storage to survive failures
NodeInfoStorage.getInstance()
.retrieveNodeInfo()
- .setSupportedPipelineElementAppIds(invocableRegistration.getSupportedPipelineElementAppIds());
+ .setSupportedPipelineElementAppIds(registration.getSupportedPipelineElementAppIds());
} catch (IOException e) {
e.printStackTrace();
}
}
- /**
- * Invoke pipeline elements when pipeline is started
- *
- * @param pipelineElementEndpoint
- * @param payload
- * @return
- */
- public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
- LOG.info("Invoking element: {}", pipelineElementEndpoint);
+ @Override
+ public String invoke(String endpoint, String payload) {
+ LOG.info("Invoke pipeline element: {}", endpoint);
try {
Response httpResp = Request
- .Post(pipelineElementEndpoint)
+ .Post(endpoint)
.bodyString(payload, ContentType.APPLICATION_JSON)
.connectTimeout(CONNECT_TIMEOUT)
.execute();
- return httpResp.toString();
+
+ String resp = httpResp.returnContent().asString();
+ org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
+ org.apache.streampipes.model.Response.class);
+
+ return streamPipesResp.toString();
} catch (Exception e) {
LOG.error(e.getMessage());
}
- return "";
+ throw new IllegalArgumentException("Failed to invoke pipeline element: " + endpoint);
}
- /**
- * detaches pipeline elements when pipeline is stopped
- */
- // TODO: implement detach pe logic
- public void detachPipelineElement() {
+ @Override
+ public String detach(String endpoint) {
+ LOG.info("Detach pipeline element: {}", endpoint);
+ try {
+ Response httpResp = Request
+ .Delete(endpoint)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+
+ String resp = httpResp.returnContent().asString();
+ org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
+ org.apache.streampipes.model.Response.class);
+ return streamPipesResp.toString();
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ throw new IllegalArgumentException("Failed to detach pipeline element: " + endpoint);
}
- public void unregisterPipelineElements(){
+ @Override
+ public void unregister(){
+ // TODO: unregister element from Consul and
NodeInfoStorage.getInstance()
.retrieveNodeInfo()
.setSupportedPipelineElementAppIds(Collections.emptyList());
@@ -114,14 +125,14 @@ public class PipelineElementManager {
private String makeConsulRegistrationEndpoint() {
if (System.getenv(ENV_CONSUL_LOCATION) != null) {
- return PROTOCOL
+ return HTTP_PROTOCOL
+ System.getenv(ENV_CONSUL_LOCATION)
+ COLON
+ "8500"
+ SLASH
+ "v1/agent/service/register";
} else {
- return PROTOCOL
+ return HTTP_PROTOCOL
+ "localhost"
+ COLON
+ "8500"
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
similarity index 66%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index 09e373f..3067f71 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,35 +15,36 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.pe;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.node.controller.container.management.IRunningInstances;
import java.util.HashMap;
import java.util.Map;
-public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
+public enum RunningInvocableInstances implements IRunningInstances<InvocableStreamPipesEntity> {
INSTANCE;
- private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+ private final Map<String, InvocableStreamPipesEntity> runningInvocableInstances = new HashMap<>();
- // TODO: persist active relays to support failure handling
@Override
- public void add(String id, EventRelayManager eventRelayManager) {
- runningInstances.put(id, eventRelayManager);
+ public void add(String id, InvocableStreamPipesEntity value) {
+ runningInvocableInstances.put(id, value);
}
@Override
public boolean isRunning(String id) {
- return runningInstances.get(id) != null;
+ return runningInvocableInstances.get(id) != null;
}
@Override
- public EventRelayManager get(String id) {
- return isRunning(id) ? runningInstances.get(id) : null;
+ public InvocableStreamPipesEntity get(String id) {
+ return runningInvocableInstances.get(id);
}
@Override
public void remove(String id) {
- runningInstances.remove(id);
+ runningInvocableInstances.remove(id);
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
index af15274..43993e2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import com.google.gson.Gson;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
index 1d59eb3..2f04f6e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import com.google.gson.Gson;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
index 41ff66d..768cdc2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.kafka.common.KafkaException;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
index d3a7af5..c8d0346 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
index 755715c..d91c151 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.kafka.common.KafkaException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -28,7 +29,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-
public class MqttKafkaBridge extends AbstractMqttKafkaConnector implements EventRelay<MqttTransportProtocol, KafkaTransportProtocol> {
private final static Logger LOG = LoggerFactory.getLogger(MqttKafkaBridge.class);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 09e373f..fc56a57 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay;
import org.apache.streampipes.node.controller.container.management.IRunningInstances;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
index 621be48..4d2dd34 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay.model;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay.model;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay.model;
public class Metrics {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
index 9a90aca..450cb33 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay.model;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay.model;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.relay.model;
public class Relay {
private final String sourceHost;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
index 0affe45..c34960d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.resource;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.resource;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.resources;
import com.google.gson.Gson;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
index 00dface..45bbe39 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.rest;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.rest;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.rest;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
@@ -24,11 +25,12 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
+@Path("/api/v2/relay")
public class DebugRelayResource extends AbstractNodeContainerResource {
// TODO: Debug-only.
@POST
- @Path("/relay/start")
+ @Path("/start")
public Response debugRelayEventStream(String msg) throws SpRuntimeException {
// TODO implement
@@ -41,7 +43,7 @@ public class DebugRelayResource extends AbstractNodeContainerResource {
}
@POST
- @Path("/relay/stop")
+ @Path("/stop")
public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
// TODO implement
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 40b3812..c02539c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.rest;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,8 +15,10 @@ package org.apache.streampipes.node.controller.container.rest;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.rest;
+
import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -25,7 +26,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/node")
+@Path("/api/v2/node")
public class InfoStatusResource extends AbstractNodeContainerResource{
@GET
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
similarity index 67%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index e1e302c..f1823fd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -17,16 +17,15 @@
*/
package org.apache.streampipes.node.controller.container.rest;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.container.transform.Transformer;
-import org.apache.streampipes.container.util.Util;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
+import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
+import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -38,9 +37,9 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
-@Path("/node/container")
-public class InvocableManagementResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
- private static final Logger LOG = LoggerFactory.getLogger(InvocableManagementResource.class.getCanonicalName());
+@Path("/api/v2/node/container")
+public class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+ private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -64,7 +63,7 @@ public class InvocableManagementResource<I extends InvocableStreamPipesEntity> e
.readValue(body, InvocableRegistration.class);
// register pipeline elements at consul and node controller
- PipelineElementManager.getInstance().registerPipelineElements(invocableRegistration);
+ InvocableElementManager.getInstance().register(invocableRegistration);
LOG.info("Sucessfully registered pipeline element container");
} catch (IOException e) {
@@ -76,67 +75,68 @@ public class InvocableManagementResource<I extends InvocableStreamPipesEntity> e
@Path("/invoke/{identifier}/{elementId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public Response invoke(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
+ public String invoke(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId, String payload) {
// TODO implement
- String pipelineElementEndpoint;
+ String endpoint;
InvocableStreamPipesEntity graph;
try {
if (identifier.equals("sepa")) {
graph = Transformer.fromJsonLd(DataProcessorInvocation.class, payload);
+ endpoint = graph.getBelongsTo();
// TODO: start event relay to remote broker
// EventRelayManager eventRelayManager = new EventRelayManager(graph);
// eventRelayManager.start();
// RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
- PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
- }
- else if (identifier.equals("sec")) {
+ RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
+
+ String resp = InvocableElementManager.getInstance().invoke(endpoint, payload);
+ return resp;
+
+ } else if (identifier.equals("sec")) {
graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
- pipelineElementEndpoint = graph.getBelongsTo();
- PipelineElementManager.getInstance().invokePipelineElement(pipelineElementEndpoint, payload);
+ endpoint = graph.getBelongsTo();
+
+ InvocableElementManager.getInstance().invoke(endpoint, payload);
}
- //pipelineElementEndpoint = graph.getElementEndpointHostname() + COLON + graph.getElementEndpointPort() + "/" + identifier + "/" + elementId;
} catch (IOException e) {
e.printStackTrace();
}
- return ok();
+ return "";
}
- // TODO move endpoint to /elementId/instances/runningInstanceId
@DELETE
- @Path("{elementId}/{runningInstanceId}")
+ @Path("/detach/{identifier}/{elementId}/{runningInstanceId}")
@Produces(MediaType.APPLICATION_JSON)
- public String detach(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+ public String detach(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId,
+ @PathParam("runningInstanceId") String runningInstanceId) {
LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
- return Util.toResponseString(elementId, false, "Could not find the running instance with id: " + runningInstanceId);
- }
+ // TODO store host and port locally to retrieve by runningInstanceId
- @POST
- @Path("/detach")
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public Response detachPipelineElement(String appId) throws SpRuntimeException {
- // TODO implement
+ String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
- // TODO: stop event relay to remote broker
- EventRelayManager relay = RunningRelayInstances.INSTANCE.get(appId);
- assert relay != null;
- relay.stop();
- RunningRelayInstances.INSTANCE.remove(appId);
+ String resp = InvocableElementManager.getInstance().detach(endpoint + "/" + runningInstanceId);
+ RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
+ return resp;
- return ok();
+ // TODO: stop event relay to remote broker
+// EventRelayManager relay = RunningRelayInstances.INSTANCE.get(elementId);
+// assert relay != null;
+// relay.stop();
+// RunningRelayInstances.INSTANCE.remove(elementId);
}
@DELETE
@Path("/remove")
@Consumes(MediaType.APPLICATION_JSON)
public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
- PipelineElementManager.getInstance().unregisterPipelineElements();
+ InvocableElementManager.getInstance().unregister();
return ok(DockerContainerOrchestrator.getInstance().remove(container));
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 2f6f06d..a18816c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.rest;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +15,7 @@ package org.apache.streampipes.node.controller.container.rest;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.rest;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.stereotype.Component;
@@ -26,7 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
public NodeControllerResourceConfig() {
register(HealthCheckResource.class);
register(InfoStatusResource.class);
- register(InvocableManagementResource.class);
+ register(InvocableEntityResource.class);
// TODO remove later - only for local relay tests
register(DebugRelayResource.class);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index 3604171..7959ce6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -30,7 +30,7 @@ import java.util.stream.Stream;
public class EndpointFetcher {
public List<RdfEndpoint> getEndpoints() {
- List<String> endpoints = ConsulUtil.getActivePEServicesEndPoints();
+ List<String> endpoints = ConsulUtil.getActivePeEndpoints();
List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
for (String endpoint : endpoints) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
index 0e9a3eb..8c7db7f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
@@ -21,19 +21,19 @@ import org.apache.streampipes.model.SpDataSet;
public class DataSetEntityUrlGenerator extends EndpointUrlGenerator<SpDataSet> {
- public DataSetEntityUrlGenerator(SpDataSet pipelineElement) {
- super(pipelineElement);
+ public DataSetEntityUrlGenerator(SpDataSet graph) {
+ super(graph);
}
@Override
public String generateInvokeEndpoint() {
- return pipelineElement.getElementId();
+ return graph.getElementId();
}
@Override
public String generateDetachEndpoint() {
- return pipelineElement.getElementId()
+ return graph.getElementId()
+ SLASH
- + pipelineElement.getDatasetInvocationId() ;
+ + graph.getDatasetInvocationId() ;
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
index 4222c77..c05ed6e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
@@ -21,13 +21,14 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity;
public abstract class EndpointUrlGenerator<T extends NamedStreamPipesEntity> {
+ protected static final String HTTP_PROTOCOL = "http://";
+ protected static final String COLON = ":";
protected static final String SLASH = "/";
- protected static final String URLPREFIX = "http://";
- protected T pipelineElement;
+ protected T graph;
- public EndpointUrlGenerator(T pipelineElement) {
- this.pipelineElement = pipelineElement;
+ public EndpointUrlGenerator(T graph) {
+ this.graph = graph;
}
public abstract String generateInvokeEndpoint();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 9fec987..814f026 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -19,126 +19,87 @@ package org.apache.streampipes.manager.execution.http;
import org.apache.streampipes.config.consul.ConsulSpConfig;
import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.manager.node.AvailableNodesFetcher;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.node.NodeInfo;
-
-import java.util.Optional;
public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
- private static final String COLON = ":";
- private static final String SLASH = "/";
- protected static final String SINK_IDENTIFIER = "sec";
- protected static final String PROCESSOR_IDENTIFIER = "sepa";
- private static final String DEFAULT_NODE_ID = "default";
- private static final String PE_PORT_KEY = "SP_PORT";
- private static final String PE_HOST_KEY = "SP_HOST";
-
- private static final String NODE_CONTROLLER_ROUTE = "node/container";
+ private static final String DEFAULT_TARGET_NODE_ID = "default";
+ private static final String INVOKE_ROUTE = "api/v2/node/container/invoke";
+ private static final String DETACH_ROUTE = "api/v2/node/container/detach";
- public InvocableEntityUrlGenerator(InvocableStreamPipesEntity pipelineElement) {
- super(pipelineElement);
+ public InvocableEntityUrlGenerator(InvocableStreamPipesEntity graph) {
+ super(graph);
}
@Override
public String generateInvokeEndpoint() {
- if (pipelineElement.getDeploymentTargetNodeId() == null ||
- pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
+ if (isDefaultTarget()) {
// default deployments to primary pipeline element
-// return URLPREFIX
-// + getHost()
-// + SLASH
-// + getIdentifier()
-// + SLASH
-// + pipelineElement.getAppId();
- return defaultHost();
+ return getDefaultEndpoint();
} else {
// edge deployments to secondary pipeline element
- return URLPREFIX
- + getHost()
- + SLASH
- + NODE_CONTROLLER_ROUTE
- + "invoke"
- + SLASH
- + getIdentifier()
- + SLASH
- + pipelineElement.getAppId();
+ return getDeploymentTargetEndpoint(INVOKE_ROUTE);
}
}
@Override
public String generateDetachEndpoint() {
- // TODO: handle stop requests
- return generateInvokeEndpoint()
- + SLASH
- + pipelineElement.getDeploymentRunningInstanceId();
- }
-
- private String getHost() {
- if (pipelineElement.getDeploymentTargetNodeId() == null ||
- pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
- return defaultHost();
+ if (isDefaultTarget()) {
+ // detach primary pipeline element
+ return getDefaultEndpoint() + SLASH + graph.getDeploymentRunningInstanceId();
+ } else {
+ // detach edge deployments to secondary pipeline element
+ return getDeploymentTargetEndpoint(DETACH_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
}
- else {
- if (deploymentTargetNodeRunning()) {
-
- String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
- + pipelineElement.getElementEndpointServiceName()
- + SLASH
- + ConsulSpConfig.BASE_PREFIX
- + SLASH
- + ConsulSpConfig.SECONDARY_NODE_KEY
- + SLASH
- + pipelineElement.getDeploymentTargetNodeId()
- + SLASH;
-
- String host = ConsulUtil.getStringValue(route + PE_HOST_KEY);
- int port = ConsulUtil.getIntValue(route + PE_PORT_KEY);
+ }
- // Necessary because secondary pipeline element description is not stored in backend
- // It uses information from primary pipeline element. Node controller will locally forward
- // request accordingly, thus fields must be correct.
- pipelineElement.setElementEndpointHostname(host);
- pipelineElement.setElementEndpointPort(port);
- pipelineElement.setBelongsTo(URLPREFIX + host + COLON + port + SLASH + getIdentifier() + SLASH + pipelineElement.getAppId());
- pipelineElement.setElementId(pipelineElement.getBelongsTo() + SLASH + pipelineElement.getDeploymentRunningInstanceId());
+ // Helper methods
- return pipelineElement.getDeploymentTargetNodeHostname()
- + COLON
- + pipelineElement.getDeploymentTargetNodePort();
- }
- else {
- return defaultHost();
- }
- }
+ private boolean isDefaultTarget() {
+ return graph.getDeploymentTargetNodeId() == null ||
+ graph.getDeploymentTargetNodeId().equals(DEFAULT_TARGET_NODE_ID);
}
- private String defaultHost() {
-// return pipelineElement.getElementEndpointHostname()
-// + COLON
-// + pipelineElement.getElementEndpointPort();
- return pipelineElement.getBelongsTo();
+ private String getDefaultEndpoint() {
+ return graph.getBelongsTo();
}
- private String getIdentifier() {
- return pipelineElement instanceof DataProcessorInvocation ? PROCESSOR_IDENTIFIER : SINK_IDENTIFIER;
+ private String getDeploymentTargetEndpoint(String route) {
+ modifyInvocableElement();
+ return HTTP_PROTOCOL + graph.getDeploymentTargetNodeHostname() + COLON + graph.getDeploymentTargetNodePort()
+ + SLASH
+ + route
+ + SLASH
+ + getIdentifier()
+ + SLASH
+ + graph.getAppId();
}
- private Optional<NodeInfo> getNodeInfo() {
- return new AvailableNodesFetcher()
- .fetchNodes()
- .stream()
- .filter(node -> node.getNodeControllerId().equals(pipelineElement.getDeploymentTargetNodeId()))
- .findFirst();
+ private void modifyInvocableElement() {
+ // Necessary because secondary pipeline element description is not stored in backend
+ // It uses information from primary pipeline element. Node controller will locally forward
+ // request accordingly, thus fields must be correct.
+ String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+ + graph.getElementEndpointServiceName()
+ + SLASH
+ + ConsulSpConfig.BASE_PREFIX
+ + SLASH
+ + ConsulSpConfig.SECONDARY_NODE_KEY
+ + SLASH
+ + graph.getDeploymentTargetNodeId()
+ + SLASH;
+
+ String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+ int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+ graph.setElementEndpointHostname(host);
+ graph.setElementEndpointPort(port);
+ graph.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + getIdentifier() + SLASH + graph.getAppId());
+ graph.setElementId(graph.getBelongsTo() + SLASH + graph.getDeploymentRunningInstanceId());
}
- private boolean deploymentTargetNodeRunning() {
- return new AvailableNodesFetcher()
- .fetchNodes()
- .stream()
- .anyMatch(node -> node.getNodeControllerId().equals(pipelineElement.getDeploymentTargetNodeId()));
+ private String getIdentifier() {
+ return graph instanceof DataProcessorInvocation ? "sepa" : "sec";
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index f1fe90f..597e9d0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -111,7 +111,7 @@ public class InvocationGraphBuilder {
// TODO: set event relay to true
// TODO: add target edge node broker to List<EventRelays>
- String broker = getEdgeBroker(t);
+ //String broker = getEdgeBroker(t);
t.getInputStreams()
.get(getIndex(source.getDOM(), t))
@@ -185,10 +185,11 @@ public class InvocationGraphBuilder {
}
private String getEdgeBroker(InvocableStreamPipesEntity target) {
- return ConsulUtil.getStringValue(
+ // TODO: no hardcoded route - only for testing
+ return ConsulUtil.getValueForRoute(
"sp/v1/node/org.apache.streampipes.node.controller/"
- + target.getDeploymentTargetNodeId()
- + "/config/SP_NODE_BROKER_HOST");
+ + target.getDeploymentTargetNodeHostname()
+ + "/config/SP_NODE_BROKER_HOST", String.class);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
index bba6d68..4c7e027 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
@@ -54,7 +54,7 @@ public class AvailableNodesFetcher {
private NodeInfo fetchNodeInfo(String activeNode) throws IOException {
String response = Request
- .Get(activeNode + "/node/info")
+ .Get(activeNode + "/api/v2/node/info")
.addHeader("Accept", MediaType.APPLICATION_JSON)
.execute()
.returnContent()
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
index 8d9d8c4..db9e3db 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
@@ -59,7 +59,7 @@ public class ConsulConfig extends AbstractRestInterface implements IConsulConfig
@Override
public Response getAllServiceConfigs() {
LOG.info("Request for all service configs");
- Map<String, Tuple2<String,String>> peServices = ConsulUtil.getPEServices();
+ Map<String, Tuple2<String,String>> peServices = ConsulUtil.getPeServices();
List<PeConfig> peConfigs = new LinkedList<>();
diff --git a/ui/src/app/core-model/gen/streampipes-model-client.ts b/ui/src/app/core-model/gen/streampipes-model-client.ts
index 8962d8a..4e18b90 100644
--- a/ui/src/app/core-model/gen/streampipes-model-client.ts
+++ b/ui/src/app/core-model/gen/streampipes-model-client.ts
@@ -19,7 +19,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-11-17 20:41:04.
+// Generated using typescript-generator version 2.24.612 on 2020-11-25 23:55:36.
export class FileMetadata {
createdAt: number;
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 1481060..712f6d4 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,10 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-11-17 20:40:59.
+// Generated using typescript-generator version 2.24.612 on 2020-11-25 23:55:30.
export class AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
if (!data) {
@@ -54,7 +54,7 @@ export class AccessibleSensorActuatorResource {
}
export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
+ "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
elementId: string;
static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
@@ -2147,6 +2147,7 @@ export class NodeMetadata {
nodeAddress: string;
nodeLocationTags: string[];
nodeModel: string;
+ nodeType: string;
static fromData(data: NodeMetadata, target?: NodeMetadata): NodeMetadata {
if (!data) {
@@ -2155,6 +2156,7 @@ export class NodeMetadata {
const instance = target || new NodeMetadata();
instance.nodeAddress = data.nodeAddress;
instance.nodeModel = data.nodeModel;
+ instance.nodeType = data.nodeType;
instance.nodeLocationTags = __getCopyArrayFn(__identity<string>())(data.nodeLocationTags);
return instance;
}
@@ -2285,6 +2287,31 @@ export class PipelineCategory {
}
}
+export class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
+ "@class": "org.apache.streampipes.model.node.PipelineElementDockerContainer";
+ containerName: string;
+ containerPorts: string[];
+ envVars: string[];
+ imageURI: string;
+ labels: { [index: string]: string };
+ serviceId: string;
+
+ static fromData(data: PipelineElementDockerContainer, target?: PipelineElementDockerContainer): PipelineElementDockerContainer {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new PipelineElementDockerContainer();
+ super.fromData(data, instance);
+ instance.imageURI = data.imageURI;
+ instance.containerName = data.containerName;
+ instance.serviceId = data.serviceId;
+ instance.containerPorts = __getCopyArrayFn(__identity<string>())(data.containerPorts);
+ instance.envVars = __getCopyArrayFn(__identity<string>())(data.envVars);
+ instance.labels = __getCopyObjectFn(__identity<string>())(data.labels);
+ return instance;
+ }
+}
+
export class PipelineElementRecommendation {
count: number;
description: string;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index b40f136..247508f 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -143,7 +143,6 @@ export class SavePipelineComponent implements OnInit {
modifyPipelineElementsDeployments(pipelineElements) {
pipelineElements.forEach(p => {
let selectedTargetNodeId = p.deploymentTargetNodeId
- console.log(selectedTargetNodeId);
if(selectedTargetNodeId != "default") {
let selectedNode = this.edgeNodes
.filter(node => node.nodeControllerId === selectedTargetNodeId)
@@ -155,7 +154,6 @@ export class SavePipelineComponent implements OnInit {
.map(node => node.nodeControllerPort)[0]
}
else {
- console.log('null');
p.deploymentTargetNodeHostname = null
p.deploymentTargetNodePort = null
}