You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/26 21:53:24 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-260] add
connection retry option for config and service registration
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 8b2d617 [STREAMPIPES-260] add connection retry option for config and service registration
8b2d617 is described below
commit 8b2d61736ae99ec0e724aba341f1fb52fda5034a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Nov 26 22:52:58 2020 +0100
[STREAMPIPES-260] add connection retry option for config and service registration
---
.../streampipes/config/consul/ConsulSpConfig.java | 79 ++++--
.../consul/ConsulServiceRegistrationBody.java | 90 ++++++
.../model/consul/HealthCheckConfiguration.java | 57 ++++
.../streampipes/container/util/ConsulUtil.java | 302 ++++++++++++++-------
.../manager/endpoint/EndpointFetcher.java | 2 +-
.../apache/streampipes/rest/impl/ConsulConfig.java | 2 +-
6 files changed, 413 insertions(+), 119 deletions(-)
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
index 19f8ca3..1b2c5ba 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
@@ -26,17 +26,25 @@ import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.config.SpConfigChangeCallback;
import org.apache.streampipes.config.model.ConfigItem;
import org.apache.streampipes.config.model.ConfigurationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
+import java.net.Socket;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class ConsulSpConfig extends SpConfig implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class.getCanonicalName());
+ private static final String SLASH = "/";
private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+ private static final int CONSUL_DEFAULT_PORT = 8500;
public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
+
private String serviceName;
private KeyValueClient kvClient;
@@ -47,32 +55,69 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
public ConsulSpConfig(String serviceName) {
super(serviceName);
- //TDOO use consul adress from an environment variable
+ Consul consul = consulInstance();
+ this.kvClient = consul.keyValueClient();
+ this.serviceName = serviceName;
+ }
+
+ public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
+ this(serviceName);
+ this.callback = callback;
+ this.configProps = new HashMap<>();
+ new Thread(this).start();
+ }
+
+ private static Consul consulInstance() {
+ boolean connected = false;
+ URL consulUrl = consulURL();
+
+ while (!connected) {
+ LOG.info("Trying to connect to Consul to register config items");
+ connected = isReady(consulUrl.getHost(), consulUrl.getPort());
+
+ if (!connected) {
+ LOG.info("Retrying in 1 second");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("Successfully connected to Consul");
+ return Consul.builder().withUrl(consulURL()).build();
+ }
+
+ private static URL consulURL() {
Map<String, String> env = System.getenv();
- Consul consul;
+ URL url = null;
+
if (env.containsKey(CONSUL_ENV_LOCATION)) {
- URL url = null;
try {
- url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+ url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
- consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
} else {
- consul = Consul.builder().build();
+ try {
+ url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
}
-
-// Consul consul = Consul.builder().build(); // connect to Consul on localhost
- kvClient = consul.keyValueClient();
-
- this.serviceName = serviceName;
+ return url;
}
- public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
- this(serviceName);
- this.callback = callback;
- this.configProps = new HashMap<>();
- new Thread(this).start();
+ public static boolean isReady(String host, int port) {
+ try {
+ InetSocketAddress sa = new InetSocketAddress(host, port);
+ Socket ss = new Socket();
+ ss.connect(sa, 1000);
+ ss.close();
+ } catch(Exception e) {
+ return false;
+ }
+ return true;
}
@Override
@@ -235,7 +280,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
}
private String addSn(String key) {
- return SERVICE_ROUTE_PREFIX + serviceName + "/" + key;
+ return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
}
private ConfigItem fromJson(String content) {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java
new file mode 100644
index 0000000..297cec7
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.model.consul;
+
+import java.util.List;
+
+public class ConsulServiceRegistrationBody {
+
+ private String ID;
+ private String Name;
+ private List<String> Tags;
+ private String Address;
+ private Integer Port;
+ private Boolean EnableTagOverride;
+ private HealthCheckConfiguration Check;
+
+ public ConsulServiceRegistrationBody() {
+ }
+
+ public String getID() {
+ return ID;
+ }
+
+ public void setID(String ID) {
+ this.ID = ID;
+ }
+
+ public String getName() {
+ return Name;
+ }
+
+ public void setName(String name) {
+ Name = name;
+ }
+
+ public List<String> getTags() {
+ return Tags;
+ }
+
+ public void setTags(List<String> tags) {
+ Tags = tags;
+ }
+
+ public String getAddress() {
+ return Address;
+ }
+
+ public void setAddress(String address) {
+ Address = address;
+ }
+
+ public Integer getPort() {
+ return Port;
+ }
+
+ public void setPort(Integer port) {
+ Port = port;
+ }
+
+ public Boolean getEnableTagOverride() {
+ return EnableTagOverride;
+ }
+
+ public void setEnableTagOverride(Boolean enableTagOverride) {
+ EnableTagOverride = enableTagOverride;
+ }
+
+ public HealthCheckConfiguration getCheck() {
+ return Check;
+ }
+
+ public void setCheck(HealthCheckConfiguration check) {
+ Check = check;
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java
new file mode 100644
index 0000000..209bd3d
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.model.consul;
+
+public class HealthCheckConfiguration {
+ private String Method;
+ private String http;
+ private String interval;
+
+ public HealthCheckConfiguration() {
+ }
+
+ public HealthCheckConfiguration(String method, String http, String interval) {
+ Method = method;
+ this.http = http;
+ this.interval = interval;
+ }
+
+ public String getMethod() {
+ return Method;
+ }
+
+ public void setMethod(String method) {
+ Method = method;
+ }
+
+ public String getHttp() {
+ return http;
+ }
+
+ public void setHttp(String http) {
+ this.http = http;
+ }
+
+ public String getInterval() {
+ return interval;
+ }
+
+ public void setInterval(String interval) {
+ this.interval = interval;
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index cc39a20..3929f67 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.container.util;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.orbitz.consul.AgentClient;
import com.orbitz.consul.Consul;
import com.orbitz.consul.HealthClient;
@@ -29,91 +30,177 @@ import com.orbitz.consul.model.health.ServiceHealth;
import com.orbitz.consul.model.kv.Value;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.config.model.ConfigItem;
+import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
+import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class ConsulUtil {
- private static final String PROTOCOL = "http://";
+ private static final Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
private static final String HEALTH_CHECK_INTERVAL = "10s";
- //private static final String HEALTH_CHECK_TTL = "15s";
- //private static final String CONSUL_DEREGISTER_SERIVER_AFTER = "10s";
- private static final String PE_SERVICE_NAME = "pe";
-
+ private static final String PE_SVC_TAG = "pe";
private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+ private static final int CONSUL_DEFAULT_PORT = 8500;
+ private static final String CONSUL_NAMESPACE = "/sp/v1/";
private static final String CONSUL_URL_REGISTER_SERVICE = "v1/agent/service/register";
- static Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
-
public static Consul consulInstance() {
return Consul.builder().withUrl(consulURL()).build();
}
- public static void registerPeService(String serviceID, String url, int port) {
- registerService(PE_SERVICE_NAME, serviceID, url, port, "pe");
+ /**
+ * Method called by {@link org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter} to register
+ * new pipeline element service endpoint.
+ *
+ * @param svcId unique service id
+ * @param host host address of pipeline element service endpoint
+ * @param port port of pipeline element service endpoint
+ */
+ public static void registerPeService(String svcId, String host, int port) {
+ registerService(PE_SVC_TAG, svcId, host, port, Collections.singletonList(PE_SVC_TAG));
}
- public static void registerService(String serviceName, String serviceID, String url, int port, String tag) {
- String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
- try {
- registerServiceHttpClient(body);
- LOG.info("Register service " + serviceID, "succesful");
- } catch (IOException e) {
- LOG.error("Register service: " + serviceID, " - " + e.toString());
+ /**
+ * Register service at Consul.
+ *
+ * @param svcGroup service group for registered service
+ * @param svcId unique service id
+ * @param host host address of service endpoint
+ * @param port port of service endpoint
+ * @param tags tags of service
+ */
+ public static void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+ boolean connected = false;
+
+ while (!connected) {
+ LOG.info("Trying to register service at Consul: " + svcId);
+ ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+ connected = registerServiceHttpClient(svcRegistration);
+
+ if (!connected) {
+ LOG.info("Retrying in 1 second");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
+ LOG.info("Successfully registered service at Consul: " + svcId);
}
- //NOT TESTED
- /* public static void subcribeHealthService() {
- Consul consul = consulInstance();
- HealthClient healthClient = consul.healthClient();
- Agent agent = consul.agentClient().getAgent();
-
+ /**
+ * PUT REST call to Consul API to register new service.
+ *
+ * @param svcRegistration service registration object used to register service endpoint
+ * @return success or failure of service registration
+ */
+ private static boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
+ try {
+ String endpoint = makeConsulEndpoint();
+ String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
- ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, PE_SERVICE_NAME);
+ Request.Put(endpoint)
+ .addHeader("accept", "application/json")
+ .body(new StringEntity(body))
+ .execute();
- svHealth.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
- @Override
- public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
- System.out.println("ad");
- }
- });
+ return true;
+ } catch (IOException e) {
+ LOG.error("Could not register service at Consul");
}
- */
+ return false;
+ }
+
+ // Getters, update methods
- public static Map<String, String> getPEServices() {
- LOG.info("Load PE service status");
+ /**
+ * Get all pipeline element service endpoints
+ *
+ * @return list of pipline element service endpoints
+ */
+ public static Map<String, String> getPeServices() {
+ LOG.info("Load pipeline element service status");
Consul consul = consulInstance();
AgentClient agent = consul.agentClient();
Map<String, Service> services = consul.agentClient().getServices();
Map<String, HealthCheck> checks = agent.getChecks();
- Map<String, String> peServices = new HashMap<>();
+ Map<String, String> peSvcs = new HashMap<>();
for (Map.Entry<String, Service> entry : services.entrySet()) {
- if (entry.getValue().getTags().contains(PE_SERVICE_NAME)) {
+ if (entry.getValue().getTags().contains(PE_SVC_TAG)) {
String serviceId = entry.getValue().getId();
String serviceStatus = "critical";
if (checks.containsKey("service:" + entry.getKey())) {
serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
}
LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
- peServices.put(serviceId, serviceStatus);
+ peSvcs.put(serviceId, serviceStatus);
}
}
- return peServices;
+ return peSvcs;
+ }
+
+ /**
+ * Get active pipeline element service endpoints
+ *
+ * @return list of pipeline element endpoints
+ */
+ public static List<String> getActivePeEndpoints() {
+ LOG.info("Load active pipeline element service endpoints");
+ return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PE_SVC_TAG));
}
+ /**
+ * Get service endpoints
+ *
+ * @param svcGroup service group for registered service
+ * @param restrictToHealthy retrieve healthy or all registered services for a service group
+ * @param filterByTags filter param to filter list of registered services
+ * @return list of services
+ */
+ public static List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+ List<String> filterByTags) {
+ Consul consul = consulInstance();
+ HealthClient healthClient = consul.healthClient();
+ List<String> endpoints = new LinkedList<>();
+ List<ServiceHealth> nodes;
+
+ if (!restrictToHealthy) {
+ nodes = healthClient.getAllServiceInstances(svcGroup).getResponse();
+ } else {
+ nodes = healthClient.getHealthyServiceInstances(svcGroup).getResponse();
+ }
+ for (ServiceHealth node : nodes) {
+ if (node.getService().getTags().containsAll(filterByTags)) {
+ String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
+ LOG.info("Active " + svcGroup + " endpoint: " + endpoint);
+ endpoints.add(endpoint);
+ }
+ }
+ return endpoints;
+ }
+
+ /**
+ * Get key-value entries for a given route
+ *
+ * @param route route to retrieve key-value entries in Consul
+ * @return key-value entries
+ */
public static Map<String, String> getKeyValue(String route) {
Consul consul = consulInstance();
KeyValueClient keyValueClient = consul.keyValueClient();
@@ -129,82 +216,78 @@ public class ConsulUtil {
if (value.getValueAsString().isPresent()) {
v = value.getValueAsString().get();
}
- LOG.info("Load key: " + route + " value: " + v);
keyValues.put(key, v);
}
}
return keyValues;
}
- public static void updateConfig(String key, String entry, boolean password) {
- Consul consul = consulInstance();
- KeyValueClient keyValueClient = consul.keyValueClient();
-
- if (!password) {
- keyValueClient.putValue(key, entry);
+ /**
+ * Get specific value for a key in route
+ *
+ * @param route route to retrieve value
+ * @param type data type of return value, e.g. Integer.class, String.class
+ * @return value for key
+ */
+ public static <T> T getValueForRoute(String route, Class<T> type) {
+ try {
+ String entry = getKeyValue(route)
+ .values()
+ .stream()
+ .findFirst()
+ .orElse(null);
+
+ if (type.equals(Integer.class)) {
+ return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ } else if (type.equals(Boolean.class)) {
+ return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ } else {
+ return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+ }
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
}
-
-// keyValueClient.putValue(key + "_description", description);
-// keyValueClient.putValue(key + "_type", valueType);
- LOG.info("Updated config - key:" + key +
- " value: " + entry);
-// +
-// " description: " + description +
-// " type: " + valueType);
+ throw new IllegalArgumentException("Cannot get entry from Consul");
}
- public static List<String> getActivePEServicesEndPoints() {
- LOG.info("Load active PE services endpoints");
+ /**
+ * Update key-value config in Consul
+ *
+ * @param key key to be updated
+ * @param entry new entry
+ * @param password wether value is a password, here only non-sensitive values are updated
+ */
+ public static void updateConfig(String key, String entry, boolean password) {
Consul consul = consulInstance();
- HealthClient healthClient = consul.healthClient();
- List<String> endpoints = new LinkedList<>();
-
- List<ServiceHealth> nodes = healthClient.getHealthyServiceInstances(PE_SERVICE_NAME).getResponse();
- for (ServiceHealth node : nodes) {
- String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
- LOG.info("Active PE endpoint:" + endpoint);
- endpoints.add(endpoint);
+ if (!password) {
+ LOG.info("Updated config - key:" + key + " value: " + entry);
+ consul.keyValueClient().putValue(key, entry);
}
- return endpoints;
}
- public static void deregisterService(String serviceId) {
+ /**
+ * Deregister registered service endpoint in Consul
+ *
+ * @param svcId service id of endpoint to be deregistered
+ */
+ public static void deregisterService(String svcId) {
Consul consul = consulInstance();
-
- consul.agentClient().deregister(serviceId);
- LOG.info("Deregistered Service: " + serviceId);
+ LOG.info("Deregister service: " + svcId);
+ consul.agentClient().deregister(svcId);
}
- private static int registerServiceHttpClient(String body) throws IOException {
- return Request.Put(consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE)
- .addHeader("accept", "application/json")
- .body(new StringEntity(body))
- .execute()
- .returnResponse()
- .getStatusLine().getStatusCode();
+ /**
+ * Delete config in Consul
+ *
+ * @param key key to be deleted
+ */
+ public static void deleteConfig(String key) {
+ Consul consul = consulInstance();
+ LOG.info("Delete config: {}", key);
+ consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
}
- private static String createServiceRegisterBody(String name, String id, String url, int port, String tag) {
- String healthCheckURL = PROTOCOL + url + ":" + port;
-
- return "{" +
- "\"ID\": \"" + id + "\"," +
- "\"Name\": \"" + name + "\"," +
- "\"Tags\": [" +
- " \"" + tag + "\"" + ",\"urlprefix-/" + id + " strip=/" + id + "\"" +
- " ]," +
- " \"Address\": \"" + PROTOCOL + url + "\"," +
- " \"Port\":" + port + "," +
- " \"EnableTagOverride\": true" + "," +
- "\"Check\": {" +
- " \"Method\": \"GET\"" + "," +
- " \"http\":" + "\"" + healthCheckURL + "\"," +
- // " \"DeregisterCriticalServiceAfter\":" + "\"" + CONSUL_DEREGISTER_SERIVER_AFTER + "\"," +
- " \"interval\":" + "\"" + HEALTH_CHECK_INTERVAL + "\"" + //"," +
- //" \"TTL\":" + "\"" + HEALTH_CHECK_TTL + "\"" +
- " }" +
- "}";
- }
+ // Helper methods
private static URL consulURL() {
Map<String, String> env = System.getenv();
@@ -212,17 +295,36 @@ public class ConsulUtil {
if (env.containsKey(CONSUL_ENV_LOCATION)) {
try {
- url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+ url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
} else {
try {
- url = new URL("http", "localhost", 8500, "");
+ url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
return url;
}
+
+ private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
+ int port, List<String> tags) {
+ ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+ body.setID(id);
+ body.setName(svcGroup);
+ body.setTags(tags);
+ body.setAddress(HTTP_PROTOCOL + host);
+ body.setPort(port);
+ body.setEnableTagOverride(true);
+ body.setCheck(new HealthCheckConfiguration("GET",
+ (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+
+ return body;
+ }
+
+ private static String makeConsulEndpoint() {
+ return consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE;
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index 3604171..7959ce6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -30,7 +30,7 @@ import java.util.stream.Stream;
public class EndpointFetcher {
public List<RdfEndpoint> getEndpoints() {
- List<String> endpoints = ConsulUtil.getActivePEServicesEndPoints();
+ List<String> endpoints = ConsulUtil.getActivePeEndpoints();
List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
for (String endpoint : endpoints) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
index b3b834e..df407dc 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
@@ -58,7 +58,7 @@ public class ConsulConfig extends AbstractRestInterface implements IConsulConfig
@Override
public Response getAllServiceConfigs() {
LOG.info("Request for all service configs");
- Map<String, String> peServices = ConsulUtil.getPEServices();
+ Map<String, String> peServices = ConsulUtil.getPeServices();
List<PeConfig> peConfigs = new LinkedList<>();