You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/26 21:53:24 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-260] add connection retry option for config and service registration

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8b2d617  [STREAMPIPES-260] add connection retry option for config and service registration
8b2d617 is described below

commit 8b2d61736ae99ec0e724aba341f1fb52fda5034a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Nov 26 22:52:58 2020 +0100

    [STREAMPIPES-260] add connection retry option for config and service registration
---
 .../streampipes/config/consul/ConsulSpConfig.java  |  79 ++++--
 .../consul/ConsulServiceRegistrationBody.java      |  90 ++++++
 .../model/consul/HealthCheckConfiguration.java     |  57 ++++
 .../streampipes/container/util/ConsulUtil.java     | 302 ++++++++++++++-------
 .../manager/endpoint/EndpointFetcher.java          |   2 +-
 .../apache/streampipes/rest/impl/ConsulConfig.java |   2 +-
 6 files changed, 413 insertions(+), 119 deletions(-)

diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
index 19f8ca3..1b2c5ba 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
@@ -26,17 +26,25 @@ import org.apache.streampipes.config.SpConfig;
 import org.apache.streampipes.config.SpConfigChangeCallback;
 import org.apache.streampipes.config.model.ConfigItem;
 import org.apache.streampipes.config.model.ConfigurationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
+import java.net.Socket;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 public class ConsulSpConfig extends SpConfig implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class.getCanonicalName());
 
+    private static final String SLASH = "/";
     private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+    private static final int CONSUL_DEFAULT_PORT = 8500;
     public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
+
     private String serviceName;
     private  KeyValueClient kvClient;
 
@@ -47,32 +55,69 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
 
     public ConsulSpConfig(String serviceName) {
         super(serviceName);
-        //TDOO use consul adress from an environment variable
+        Consul consul = consulInstance();
+        this.kvClient = consul.keyValueClient();
+        this.serviceName = serviceName;
+    }
+
+    public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
+        this(serviceName);
+        this.callback = callback;
+        this.configProps = new HashMap<>();
+        new Thread(this).start();
+    }
+
+    private static Consul consulInstance() {
+        boolean connected = false;
+        URL consulUrl = consulURL();
+
+        while (!connected) {
+            LOG.info("Trying to connect to Consul to register config items");
+            connected = isReady(consulUrl.getHost(), consulUrl.getPort());
+
+            if (!connected) {
+                LOG.info("Retrying in 1 second");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        LOG.info("Successfully connected to Consul");
+        return Consul.builder().withUrl(consulURL()).build();
+    }
+
+    private static URL consulURL() {
         Map<String, String> env = System.getenv();
-        Consul consul;
+        URL url = null;
+
         if (env.containsKey(CONSUL_ENV_LOCATION)) {
-            URL url = null;
             try {
-                url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+                url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
             } catch (MalformedURLException e) {
                 e.printStackTrace();
             }
-            consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
         } else {
-            consul = Consul.builder().build();
+            try {
+                url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
+            } catch (MalformedURLException e) {
+                e.printStackTrace();
+            }
         }
-
-//        Consul consul = Consul.builder().build(); // connect to Consul on localhost
-        kvClient = consul.keyValueClient();
-
-        this.serviceName = serviceName;
+        return url;
     }
 
-    public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
-        this(serviceName);
-        this.callback = callback;
-        this.configProps = new HashMap<>();
-        new Thread(this).start();
+    public static boolean isReady(String host, int port) {
+        try {
+            InetSocketAddress sa = new InetSocketAddress(host, port);
+            Socket ss = new Socket();
+            ss.connect(sa, 1000);
+            ss.close();
+        } catch(Exception e) {
+            return false;
+        }
+        return true;
     }
 
     @Override
@@ -235,7 +280,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
     }
 
     private String addSn(String key) {
-       return SERVICE_ROUTE_PREFIX + serviceName + "/" + key;
+       return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
     }
 
     private ConfigItem fromJson(String content) {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java
new file mode 100644
index 0000000..297cec7
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/ConsulServiceRegistrationBody.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.model.consul;
+
+import java.util.List;
+
+public class ConsulServiceRegistrationBody {
+
+    private String ID;
+    private String Name;
+    private List<String> Tags;
+    private String Address;
+    private Integer Port;
+    private Boolean EnableTagOverride;
+    private HealthCheckConfiguration Check;
+
+    public ConsulServiceRegistrationBody() {
+    }
+
+    public String getID() {
+        return ID;
+    }
+
+    public void setID(String ID) {
+        this.ID = ID;
+    }
+
+    public String getName() {
+        return Name;
+    }
+
+    public void setName(String name) {
+        Name = name;
+    }
+
+    public List<String> getTags() {
+        return Tags;
+    }
+
+    public void setTags(List<String> tags) {
+        Tags = tags;
+    }
+
+    public String getAddress() {
+        return Address;
+    }
+
+    public void setAddress(String address) {
+        Address = address;
+    }
+
+    public Integer getPort() {
+        return Port;
+    }
+
+    public void setPort(Integer port) {
+        Port = port;
+    }
+
+    public Boolean getEnableTagOverride() {
+        return EnableTagOverride;
+    }
+
+    public void setEnableTagOverride(Boolean enableTagOverride) {
+        EnableTagOverride = enableTagOverride;
+    }
+
+    public HealthCheckConfiguration getCheck() {
+        return Check;
+    }
+
+    public void setCheck(HealthCheckConfiguration check) {
+        Check = check;
+    }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java
new file mode 100644
index 0000000..209bd3d
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/consul/HealthCheckConfiguration.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.container.model.consul;
+
+public class HealthCheckConfiguration {
+    private String Method;
+    private String http;
+    private String interval;
+
+    public HealthCheckConfiguration() {
+    }
+
+    public HealthCheckConfiguration(String method, String http, String interval) {
+        Method = method;
+        this.http = http;
+        this.interval = interval;
+    }
+
+    public String getMethod() {
+        return Method;
+    }
+
+    public void setMethod(String method) {
+        Method = method;
+    }
+
+    public String getHttp() {
+        return http;
+    }
+
+    public void setHttp(String http) {
+        this.http = http;
+    }
+
+    public String getInterval() {
+        return interval;
+    }
+
+    public void setInterval(String interval) {
+        this.interval = interval;
+    }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index cc39a20..3929f67 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.container.util;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.orbitz.consul.AgentClient;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.HealthClient;
@@ -29,91 +30,177 @@ import com.orbitz.consul.model.health.ServiceHealth;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.config.model.ConfigItem;
+import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
+import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class ConsulUtil {
 
-  private static final String PROTOCOL = "http://";
+  private static final Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
 
+  private static final String HTTP_PROTOCOL = "http://";
+  private static final String COLON = ":";
+  private static final String SLASH = "/";
   private static final String HEALTH_CHECK_INTERVAL = "10s";
-  //private static final String HEALTH_CHECK_TTL = "15s";
-  //private static final String CONSUL_DEREGISTER_SERIVER_AFTER = "10s";
-  private static final String PE_SERVICE_NAME = "pe";
-
+  private static final String PE_SVC_TAG = "pe";
   private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+  private static final int CONSUL_DEFAULT_PORT = 8500;
+  private static final String CONSUL_NAMESPACE = "/sp/v1/";
   private static final String CONSUL_URL_REGISTER_SERVICE = "v1/agent/service/register";
 
-  static Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
-
   public static Consul consulInstance() {
     return Consul.builder().withUrl(consulURL()).build();
   }
 
-  public static void registerPeService(String serviceID, String url, int port) {
-    registerService(PE_SERVICE_NAME, serviceID, url, port, "pe");
+  /**
+   * Method called by {@link org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter} to register
+   * new pipeline element service endpoint.
+   *
+   * @param svcId unique service id
+   * @param host  host address of pipeline element service endpoint
+   * @param port  port of pipeline element service endpoint
+   */
+  public static void registerPeService(String svcId, String host, int port) {
+    registerService(PE_SVC_TAG, svcId, host, port, Collections.singletonList(PE_SVC_TAG));
   }
 
-  public static void registerService(String serviceName, String serviceID, String url, int port, String tag) {
-    String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
-    try {
-      registerServiceHttpClient(body);
-      LOG.info("Register service " + serviceID, "succesful");
-    } catch (IOException e) {
-      LOG.error("Register service: " + serviceID, " - " + e.toString());
+  /**
+   * Register service at Consul.
+   *
+   * @param svcGroup  service group for registered service
+   * @param svcId     unique service id
+   * @param host      host address of service endpoint
+   * @param port      port of service endpoint
+   * @param tags      tags of service
+   */
+  public static void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+    boolean connected = false;
+
+    while (!connected) {
+      LOG.info("Trying to register service at Consul: " + svcId);
+      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+      connected = registerServiceHttpClient(svcRegistration);
+
+      if (!connected) {
+        LOG.info("Retrying in 1 second");
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
     }
+    LOG.info("Successfully registered service at Consul: " + svcId);
   }
 
-  //NOT TESTED
- /*   public static void subcribeHealthService() {
-        Consul consul = consulInstance();
-        HealthClient healthClient = consul.healthClient();
-        Agent agent = consul.agentClient().getAgent();
-
+  /**
+   * PUT REST call to Consul API to register new service.
+   *
+   * @param svcRegistration   service registration object used to register service endpoint
+   * @return                  success or failure of service registration
+   */
+  private static boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
+    try {
+      String endpoint = makeConsulEndpoint();
+      String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
 
-        ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, PE_SERVICE_NAME);
+      Request.Put(endpoint)
+              .addHeader("accept", "application/json")
+              .body(new StringEntity(body))
+              .execute();
 
-        svHealth.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
-            @Override
-            public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
-                System.out.println("ad");
-            }
-        });
+      return true;
+    } catch (IOException e) {
+      LOG.error("Could not register service at Consul");
     }
-    */
+    return false;
+  }
+
+  // Getters, update methods
 
-  public static Map<String, String> getPEServices() {
-    LOG.info("Load PE service status");
+  /**
+   * Get all pipeline element service endpoints
+   *
+   * @return list of pipline element service endpoints
+   */
+  public static Map<String, String> getPeServices() {
+    LOG.info("Load pipeline element service status");
     Consul consul = consulInstance();
     AgentClient agent = consul.agentClient();
 
     Map<String, Service> services = consul.agentClient().getServices();
     Map<String, HealthCheck> checks = agent.getChecks();
 
-    Map<String, String> peServices = new HashMap<>();
+    Map<String, String> peSvcs = new HashMap<>();
 
     for (Map.Entry<String, Service> entry : services.entrySet()) {
-      if (entry.getValue().getTags().contains(PE_SERVICE_NAME)) {
+      if (entry.getValue().getTags().contains(PE_SVC_TAG)) {
         String serviceId = entry.getValue().getId();
         String serviceStatus = "critical";
         if (checks.containsKey("service:" + entry.getKey())) {
           serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
         }
         LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
-        peServices.put(serviceId, serviceStatus);
+        peSvcs.put(serviceId, serviceStatus);
       }
     }
-    return peServices;
+    return peSvcs;
+  }
+
+  /**
+   * Get active pipeline element service endpoints
+   *
+   * @return list of pipeline element endpoints
+   */
+  public static List<String> getActivePeEndpoints() {
+    LOG.info("Load active pipeline element service endpoints");
+    return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PE_SVC_TAG));
   }
 
+  /**
+   * Get service endpoints
+   *
+   * @param svcGroup            service group for registered service
+   * @param restrictToHealthy   retrieve healthy or all registered services for a service group
+   * @param filterByTags        filter param to filter list of registered services
+   * @return                    list of services
+   */
+  public static List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+                                                 List<String> filterByTags) {
+    Consul consul = consulInstance();
+    HealthClient healthClient = consul.healthClient();
+    List<String> endpoints = new LinkedList<>();
+    List<ServiceHealth> nodes;
+
+    if (!restrictToHealthy) {
+      nodes = healthClient.getAllServiceInstances(svcGroup).getResponse();
+    } else {
+      nodes = healthClient.getHealthyServiceInstances(svcGroup).getResponse();
+    }
+    for (ServiceHealth node : nodes) {
+      if (node.getService().getTags().containsAll(filterByTags)) {
+        String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
+        LOG.info("Active " + svcGroup + " endpoint: " + endpoint);
+        endpoints.add(endpoint);
+      }
+    }
+    return endpoints;
+  }
+
+  /**
+   * Get key-value entries for a given route
+   *
+   * @param route route to retrieve key-value entries in Consul
+   * @return      key-value entries
+   */
   public static Map<String, String> getKeyValue(String route) {
     Consul consul = consulInstance();
     KeyValueClient keyValueClient = consul.keyValueClient();
@@ -129,82 +216,78 @@ public class ConsulUtil {
         if (value.getValueAsString().isPresent()) {
           v = value.getValueAsString().get();
         }
-        LOG.info("Load key: " + route + " value: " + v);
         keyValues.put(key, v);
       }
     }
     return keyValues;
   }
 
-  public static void updateConfig(String key, String entry, boolean password) {
-    Consul consul = consulInstance();
-    KeyValueClient keyValueClient = consul.keyValueClient();
-
-    if (!password) {
-      keyValueClient.putValue(key, entry);
+  /**
+   * Get specific value for a key in route
+   *
+   * @param route   route to retrieve value
+   * @param type    data type of return value, e.g. Integer.class, String.class
+   * @return        value for key
+   */
+  public static <T> T getValueForRoute(String route, Class<T> type) {
+    try {
+      String entry = getKeyValue(route)
+              .values()
+              .stream()
+              .findFirst()
+              .orElse(null);
+
+      if (type.equals(Integer.class)) {
+        return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      } else if (type.equals(Boolean.class)) {
+        return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      } else {
+        return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      }
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
     }
-
-//        keyValueClient.putValue(key + "_description", description);
-//        keyValueClient.putValue(key + "_type", valueType);
-    LOG.info("Updated config - key:" + key +
-            " value: " + entry);
-//        +
-//                " description: " + description +
-//                " type: " + valueType);
+    throw new IllegalArgumentException("Cannot get entry from Consul");
   }
 
-  public static List<String> getActivePEServicesEndPoints() {
-    LOG.info("Load active PE services endpoints");
+  /**
+   * Update key-value config in Consul
+   *
+   * @param key       key to be updated
+   * @param entry     new entry
+   * @param password  wether value is a password, here only non-sensitive values are updated
+   */
+  public static void updateConfig(String key, String entry, boolean password) {
     Consul consul = consulInstance();
-    HealthClient healthClient = consul.healthClient();
-    List<String> endpoints = new LinkedList<>();
-
-    List<ServiceHealth> nodes = healthClient.getHealthyServiceInstances(PE_SERVICE_NAME).getResponse();
-    for (ServiceHealth node : nodes) {
-      String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
-      LOG.info("Active PE endpoint:" + endpoint);
-      endpoints.add(endpoint);
+    if (!password) {
+      LOG.info("Updated config - key:" + key + " value: " + entry);
+      consul.keyValueClient().putValue(key, entry);
     }
-    return endpoints;
   }
 
-  public static void deregisterService(String serviceId) {
+  /**
+   * Deregister registered service endpoint in Consul
+   *
+   * @param svcId     service id of endpoint to be deregistered
+   */
+  public static void deregisterService(String svcId) {
     Consul consul = consulInstance();
-
-    consul.agentClient().deregister(serviceId);
-    LOG.info("Deregistered Service: " + serviceId);
+    LOG.info("Deregister service: " + svcId);
+    consul.agentClient().deregister(svcId);
   }
 
-  private static int registerServiceHttpClient(String body) throws IOException {
-    return Request.Put(consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE)
-            .addHeader("accept", "application/json")
-            .body(new StringEntity(body))
-            .execute()
-            .returnResponse()
-            .getStatusLine().getStatusCode();
+  /**
+   * Delete config in Consul
+   *
+   * @param key     key to be deleted
+   */
+  public static void deleteConfig(String key) {
+    Consul consul = consulInstance();
+    LOG.info("Delete config: {}", key);
+    consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
   }
 
-  private static String createServiceRegisterBody(String name, String id, String url, int port, String tag) {
-    String healthCheckURL = PROTOCOL + url + ":" + port;
-
-    return "{" +
-            "\"ID\": \"" + id + "\"," +
-            "\"Name\": \"" + name + "\"," +
-            "\"Tags\": [" +
-            "    \"" + tag + "\"" + ",\"urlprefix-/" + id + " strip=/" + id + "\"" +
-            " ]," +
-            " \"Address\": \"" + PROTOCOL + url + "\"," +
-            " \"Port\":" + port + "," +
-            " \"EnableTagOverride\": true" + "," +
-            "\"Check\": {" +
-            " \"Method\": \"GET\"" + "," +
-            " \"http\":" + "\"" + healthCheckURL + "\"," +
-            //  " \"DeregisterCriticalServiceAfter\":" +  "\"" + CONSUL_DEREGISTER_SERIVER_AFTER + "\"," +
-            " \"interval\":" + "\"" + HEALTH_CHECK_INTERVAL + "\"" + //"," +
-            //" \"TTL\":" + "\"" + HEALTH_CHECK_TTL + "\"" +
-            " }" +
-            "}";
-  }
+  // Helper methods
 
   private static URL consulURL() {
     Map<String, String> env = System.getenv();
@@ -212,17 +295,36 @@ public class ConsulUtil {
 
     if (env.containsKey(CONSUL_ENV_LOCATION)) {
       try {
-        url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+        url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
       } catch (MalformedURLException e) {
         e.printStackTrace();
       }
     } else {
       try {
-        url = new URL("http", "localhost", 8500, "");
+        url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
       } catch (MalformedURLException e) {
         e.printStackTrace();
       }
     }
     return url;
   }
+
+  private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
+                                                                      int port, List<String> tags) {
+    ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+    body.setID(id);
+    body.setName(svcGroup);
+    body.setTags(tags);
+    body.setAddress(HTTP_PROTOCOL + host);
+    body.setPort(port);
+    body.setEnableTagOverride(true);
+    body.setCheck(new HealthCheckConfiguration("GET",
+            (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+
+    return body;
+  }
+
+  private static String makeConsulEndpoint() {
+    return consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE;
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index 3604171..7959ce6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -30,7 +30,7 @@ import java.util.stream.Stream;
 public class EndpointFetcher {
 
   public List<RdfEndpoint> getEndpoints() {
-    List<String> endpoints = ConsulUtil.getActivePEServicesEndPoints();
+    List<String> endpoints = ConsulUtil.getActivePeEndpoints();
     List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
 
     for (String endpoint : endpoints) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
index b3b834e..df407dc 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
@@ -58,7 +58,7 @@ public class ConsulConfig extends AbstractRestInterface implements IConsulConfig
   @Override
   public Response getAllServiceConfigs() {
     LOG.info("Request for all service configs");
-    Map<String, String> peServices = ConsulUtil.getPEServices();
+    Map<String, String> peServices = ConsulUtil.getPeServices();
 
     List<PeConfig> peConfigs = new LinkedList<>();