You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/04 16:06:58 UTC

[streampipes] 04/05: add checkstyle to streampipes-service-discovery-consul

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch add-checkstyle-streampipes-service
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit f08f098fa2f8824b706e100ce3c38a461e05b9c4
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Dec 4 17:05:09 2022 +0100

    add checkstyle to streampipes-service-discovery-consul
---
 streampipes-service-discovery-consul/pom.xml       |  25 +-
 .../consul/ConsulHealthServiceManager.java         |  29 +-
 .../svcdiscovery/consul/ConsulProvider.java        |   5 +-
 .../svcdiscovery/consul/ConsulSpConfig.java        | 376 +++++++++++----------
 .../svcdiscovery/consul/SpConsulKvManagement.java  |  15 +-
 .../consul/SpConsulServiceDiscovery.java           |  45 +--
 6 files changed, 266 insertions(+), 229 deletions(-)

diff --git a/streampipes-service-discovery-consul/pom.xml b/streampipes-service-discovery-consul/pom.xml
index 2d79d0eb0..a2fef9f59 100644
--- a/streampipes-service-discovery-consul/pom.xml
+++ b/streampipes-service-discovery-consul/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
@@ -54,4 +55,26 @@
             <artifactId>fluent-hc</artifactId>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <logViolationsToConsole>true</logViolationsToConsole>
+                    <failOnViolation>true</failOnViolation>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
index c289ec2ef..d8d2d642b 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
@@ -17,12 +17,13 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.HealthClient;
 import com.orbitz.consul.cache.ServiceHealthCache;
 import com.orbitz.consul.model.health.ServiceHealth;
 import com.orbitz.consul.option.QueryOptions;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +39,9 @@ public enum ConsulHealthServiceManager {
   INSTANCE;
 
   private static final Logger LOG = LoggerFactory.getLogger(ConsulHealthServiceManager.class);
-
+  private static final int MAX_RETRIES = 3;
   private final Consul client;
   private final Map<String, ServiceHealthCache> serviceHealthCaches;
-  private final int MAX_RETRIES = 3;
 
   ConsulHealthServiceManager() {
     serviceHealthCaches = new HashMap<>();
@@ -72,11 +72,12 @@ public enum ConsulHealthServiceManager {
     List<ServiceHealth> activeServices = findService(serviceGroup, 0);
 
     return activeServices
-            .stream()
-            .filter(service -> allFiltersSupported(service, filterByTags))
-            .filter(service -> !restrictToHealthy || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
-            .map(this::makeServiceUrl)
-            .collect(Collectors.toList());
+        .stream()
+        .filter(service -> allFiltersSupported(service, filterByTags))
+        .filter(service -> !restrictToHealthy
+            || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
+        .map(this::makeServiceUrl)
+        .collect(Collectors.toList());
   }
 
   private String makeServiceUrl(ServiceHealth service) {
@@ -91,14 +92,14 @@ public enum ConsulHealthServiceManager {
   private List<ServiceHealth> findService(String serviceGroup, int retryCount) {
 
     if (serviceHealthCaches.containsKey(serviceGroup)
-            && serviceHealthCaches.get(serviceGroup).getMap() != null) {
+        && serviceHealthCaches.get(serviceGroup).getMap() != null) {
       ServiceHealthCache cache = serviceHealthCaches.get(serviceGroup);
       return cache
-              .getMap()
-              .values()
-              .stream()
-              .filter((value) -> value.getService().getService().equals(serviceGroup))
-              .collect(Collectors.toList());
+          .getMap()
+          .values()
+          .stream()
+          .filter((value) -> value.getService().getService().equals(serviceGroup))
+          .collect(Collectors.toList());
     } else {
       if (retryCount < MAX_RETRIES) {
         try {
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index 76e5c7b2d..49d860555 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -17,8 +17,9 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
 import org.apache.streampipes.commons.constants.Envs;
+
+import com.orbitz.consul.Consul;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,7 @@ public class ConsulProvider {
           e.printStackTrace();
         }
       }
-    } while(!connected);
+    } while (!connected);
 
     LOG.info("Successfully connected to Consul");
     return Consul.builder().withUrl(consulURL()).build();
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index 0792e8f5c..9f3f6ddec 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -18,204 +18,208 @@
 
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItemUtils;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Map;
 import java.util.Optional;
 
 public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
-
-    private static final String SLASH = "/";
-    public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
-
-    private final String serviceName;
-    private final KeyValueClient kvClient;
-
-    // TODO Implement mechanism to update the client when some configuration parameters change in Consul
-    private Map<String, Object> configProps;
-
-    public ConsulSpConfig(String serviceName) {
-        Consul consul = consulInstance();
-        this.kvClient = consul.keyValueClient();
-        this.serviceName = serviceName;
-    }
-
-    @Override
-    public <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope) {
-        register(key, String.valueOf(defaultValue), ConfigItemUtils.getValueType(defaultValue), description, configurationScope, false);
-    }
-
-    @Override
-    public void register(String key, boolean defaultValue, String description) {
-        register(key, Boolean.toString(defaultValue), "xs:boolean", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
-    }
-
-    @Override
-    public void register(String key, int defaultValue, String description) {
-        register(key, Integer.toString(defaultValue), "xs:integer", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
-    }
-
-    @Override
-    public void register(String key, double defaultValue, String description) {
-        register(key, Double.toString(defaultValue), "xs:double", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
-
-    }
-
-    @Override
-    public void register(String key, String defaultValue, String description) {
-        register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
-    }
-
-    @Override
-    public void registerObject(String key, Object defaultValue, String description) {
-        Optional<String> i = kvClient.getValueAsString(addSn(key));
-        if (!i.isPresent()) {
-            kvClient.putValue(addSn(key), toJson(defaultValue));
-        }
-    }
-
-    @Override
-    public void registerPassword(String key, String defaultValue, String description) {
-        register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, true);
-    }
-
-    @Override
-    public void register(ConfigItem configItem) {
-        String key = addSn(configItem.getKey());
-        Optional<String> i = kvClient.getValueAsString(key);
-
-        if (!i.isPresent()) {
-            // Set the value of environment variable as default
-            String envVariable = System.getenv(configItem.getKey());
-            if (envVariable != null) {
-                configItem.setValue(envVariable);
-                kvClient.putValue(key, toJson(configItem));
-            } else {
-                kvClient.putValue(key, toJson(configItem));
-            }
-        }
-    }
-
-    private void register(String key, String defaultValue, String valueType, String description, ConfigurationScope configurationScope, boolean isPassword) {
-        ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
-        register(configItem);
-
-        if (configProps != null) {
-            configProps.put(key, getString(key));
-        }
-    }
-
-    @Override
-    public boolean getBoolean(String key) {
-        return Boolean.parseBoolean(getString(key));
-    }
-
-    @Override
-    public int getInteger(String key) {
-        return Integer.parseInt(getString(key));
-    }
-
-    @Override
-    public double getDouble(String key) {
-        return Double.parseDouble(getString(key));
-    }
-
-    @Override
-    public String getString(String key) {
-      return getConfigItem(key).getValue();
-    }
-
-    @Override
-    public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
-        Optional<String> os = kvClient.getValueAsString(addSn(key));
-        if (os.isPresent()) {
-            try {
-                return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
-            } catch (JsonProcessingException e) {
-                LOG.info("Could not deserialize object", e);
-                return defaultValue;
-            }
-        } else {
-            return defaultValue;
-        }
-    }
-
-    @Override
-    public ConfigItem getConfigItem(String key) {
-      Optional<String> os = kvClient.getValueAsString(addSn(key));
-
-      return fromJson(os.get());
-    }
-
-    @Override
-    public void setBoolean(String key, Boolean value) {
-        setString(key, value.toString());
-    }
-
-    @Override
-    public void setInteger(String key, int value) {
-        setString(key, String.valueOf(value));
-    }
-
-    @Override
-    public void setDouble(String key, double value) {
-        setString(key, String.valueOf(value));
-    }
-
-    @Override
-    public void setString(String key, String value) {
-        kvClient.putValue(addSn(key), value);
-    }
-
-    @Override
-    public void setObject(String key, Object value) {
-        kvClient.putValue(addSn(key), toJson(value));
-    }
-
-    private String addSn(String key) {
-       return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
-    }
-
-    private ConfigItem fromJson(String content) {
-        try {
-          return JacksonSerializer.getObjectMapper().readValue(content, ConfigItem.class);
-        } catch (Exception e) {
-          // if old config is used, this is a fallback
-          ConfigItem configItem = new ConfigItem();
-          configItem.setValue(content);
-          return configItem;
-        }
-    }
-
-    private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope, boolean password) {
-        ConfigItem configItem = new ConfigItem();
-        configItem.setValueType(valueType);
-        configItem.setDescription(description);
-        configItem.setPassword(password);
-        configItem.setConfigurationScope(configurationScope);
-
-        return configItem;
-    }
-
-    private String toJson(Object object) {
-        try {
-            return JacksonSerializer.getObjectMapper().writeValueAsString(object);
-        } catch (JsonProcessingException e) {
-            LOG.info("Could not serialize object to JSON", e);
-            return "";
-        }
-    }
+  public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
+  private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
+  private static final String SLASH = "/";
+  private final String serviceName;
+  private final KeyValueClient kvClient;
+
+  // TODO Implement mechanism to update the client when some configuration parameters change in Consul
+  private Map<String, Object> configProps;
+
+  public ConsulSpConfig(String serviceName) {
+    Consul consul = consulInstance();
+    this.kvClient = consul.keyValueClient();
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  public <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope) {
+    register(key, String.valueOf(defaultValue), ConfigItemUtils.getValueType(defaultValue), description,
+        configurationScope, false);
+  }
+
+  @Override
+  public void register(String key, boolean defaultValue, String description) {
+    register(key, Boolean.toString(defaultValue), "xs:boolean", description,
+        ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+  }
+
+  @Override
+  public void register(String key, int defaultValue, String description) {
+    register(key, Integer.toString(defaultValue), "xs:integer", description,
+        ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+  }
+
+  @Override
+  public void register(String key, double defaultValue, String description) {
+    register(key, Double.toString(defaultValue), "xs:double", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG,
+        false);
+
+  }
+
+  @Override
+  public void register(String key, String defaultValue, String description) {
+    register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+  }
+
+  @Override
+  public void registerObject(String key, Object defaultValue, String description) {
+    Optional<String> i = kvClient.getValueAsString(addSn(key));
+    if (!i.isPresent()) {
+      kvClient.putValue(addSn(key), toJson(defaultValue));
+    }
+  }
+
+  @Override
+  public void registerPassword(String key, String defaultValue, String description) {
+    register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, true);
+  }
+
+  @Override
+  public void register(ConfigItem configItem) {
+    String key = addSn(configItem.getKey());
+    Optional<String> i = kvClient.getValueAsString(key);
+
+    if (!i.isPresent()) {
+      // Set the value of environment variable as default
+      String envVariable = System.getenv(configItem.getKey());
+      if (envVariable != null) {
+        configItem.setValue(envVariable);
+        kvClient.putValue(key, toJson(configItem));
+      } else {
+        kvClient.putValue(key, toJson(configItem));
+      }
+    }
+  }
+
+  private void register(String key, String defaultValue, String valueType, String description,
+                        ConfigurationScope configurationScope, boolean isPassword) {
+    ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
+    register(configItem);
+
+    if (configProps != null) {
+      configProps.put(key, getString(key));
+    }
+  }
+
+  @Override
+  public boolean getBoolean(String key) {
+    return Boolean.parseBoolean(getString(key));
+  }
+
+  @Override
+  public int getInteger(String key) {
+    return Integer.parseInt(getString(key));
+  }
+
+  @Override
+  public double getDouble(String key) {
+    return Double.parseDouble(getString(key));
+  }
+
+  @Override
+  public String getString(String key) {
+    return getConfigItem(key).getValue();
+  }
+
+  @Override
+  public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
+    Optional<String> os = kvClient.getValueAsString(addSn(key));
+    if (os.isPresent()) {
+      try {
+        return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
+      } catch (JsonProcessingException e) {
+        LOG.info("Could not deserialize object", e);
+        return defaultValue;
+      }
+    } else {
+      return defaultValue;
+    }
+  }
+
+  @Override
+  public ConfigItem getConfigItem(String key) {
+    Optional<String> os = kvClient.getValueAsString(addSn(key));
+
+    return fromJson(os.get());
+  }
+
+  @Override
+  public void setBoolean(String key, Boolean value) {
+    setString(key, value.toString());
+  }
+
+  @Override
+  public void setInteger(String key, int value) {
+    setString(key, String.valueOf(value));
+  }
+
+  @Override
+  public void setDouble(String key, double value) {
+    setString(key, String.valueOf(value));
+  }
+
+  @Override
+  public void setString(String key, String value) {
+    kvClient.putValue(addSn(key), value);
+  }
+
+  @Override
+  public void setObject(String key, Object value) {
+    kvClient.putValue(addSn(key), toJson(value));
+  }
+
+  private String addSn(String key) {
+    return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
+  }
+
+  private ConfigItem fromJson(String content) {
+    try {
+      return JacksonSerializer.getObjectMapper().readValue(content, ConfigItem.class);
+    } catch (Exception e) {
+      // if old config is used, this is a fallback
+      ConfigItem configItem = new ConfigItem();
+      configItem.setValue(content);
+      return configItem;
+    }
+  }
+
+  private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope,
+                                       boolean password) {
+    ConfigItem configItem = new ConfigItem();
+    configItem.setValueType(valueType);
+    configItem.setDescription(description);
+    configItem.setPassword(password);
+    configItem.setConfigurationScope(configurationScope);
+
+    return configItem;
+  }
+
+  private String toJson(Object object) {
+    try {
+      return JacksonSerializer.getObjectMapper().writeValueAsString(object);
+    } catch (JsonProcessingException e) {
+      LOG.info("Could not serialize object to JSON", e);
+      return "";
+    }
+  }
 
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
index d54277c8a..0138114a5 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
@@ -17,14 +17,15 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.model.ConsulResponse;
 import com.orbitz.consul.model.kv.Value;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,10 +42,10 @@ public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagem
   public <T> T getValueForRoute(String route, Class<T> type) {
     try {
       String entry = getKeyValue(route)
-              .values()
-              .stream()
-              .findFirst()
-              .orElse(null);
+          .values()
+          .stream()
+          .findFirst()
+          .orElse(null);
 
       if (type.equals(Integer.class)) {
         return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 3cc6984b7..8156e2545 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -17,6 +17,13 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
 import com.orbitz.consul.AgentClient;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.model.agent.ImmutableRegCheck;
@@ -24,8 +31,6 @@ import com.orbitz.consul.model.agent.ImmutableRegistration;
 import com.orbitz.consul.model.agent.Registration;
 import com.orbitz.consul.model.health.HealthCheck;
 import com.orbitz.consul.model.health.Service;
-import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,14 +63,14 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   public List<String> getActivePipelineElementEndpoints() {
     LOG.info("Discovering active pipeline element service endpoints");
     return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
-            Collections.singletonList(DefaultSpServiceTags.PE.asString()));
+        Collections.singletonList(DefaultSpServiceTags.PE.asString()));
   }
 
   @Override
   public List<String> getActiveConnectWorkerEndpoints() {
     LOG.info("Discovering active StreamPipes Connect worker service endpoints");
     return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
-            Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
+        Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
   }
 
   @Override
@@ -100,11 +105,13 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   }
 
   private boolean hasExtensionsTag(List<String> tags) {
-    return tags.stream().anyMatch(tag -> tag.equals(DefaultSpServiceTags.PE.asString()) || tag.equals(DefaultSpServiceTags.CONNECT_WORKER.asString()));
+    return tags.stream().anyMatch(tag -> tag.equals(DefaultSpServiceTags.PE.asString())
+        || tag.equals(DefaultSpServiceTags.CONNECT_WORKER.asString()));
   }
 
   private String extractServiceGroup(List<String> tags) {
-    String groupTag = tags.stream().filter(tag -> tag.startsWith(SpServiceTagPrefix.SP_GROUP.asString())).findFirst().orElse("unknown service group");
+    String groupTag = tags.stream().filter(tag -> tag.startsWith(SpServiceTagPrefix.SP_GROUP.asString())).findFirst()
+        .orElse("unknown service group");
     return groupTag.replaceAll(SpServiceTagPrefix.SP_GROUP.asString() + ":", "");
   }
 
@@ -117,18 +124,18 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
 
   private Registration createRegistrationBody(SpServiceRegistrationRequest req) {
     return ImmutableRegistration.builder()
-            .id(req.getSvcId())
-            .name(req.getSvcGroup())
-            .port(req.getPort())
-            .address(HTTP_PROTOCOL + req.getHost())
-            .check(ImmutableRegCheck.builder()
-                    .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
-                    .interval(HEALTH_CHECK_INTERVAL)
-                    .deregisterCriticalServiceAfter("120s")
-                    .status("passing")
-                    .build())
-            .tags(asString(req.getTags()))
-            .enableTagOverride(true)
-            .build();
+        .id(req.getSvcId())
+        .name(req.getSvcGroup())
+        .port(req.getPort())
+        .address(HTTP_PROTOCOL + req.getHost())
+        .check(ImmutableRegCheck.builder()
+            .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
+            .interval(HEALTH_CHECK_INTERVAL)
+            .deregisterCriticalServiceAfter("120s")
+            .status("passing")
+            .build())
+        .tags(asString(req.getTags()))
+        .enableTagOverride(true)
+        .build();
   }
 }