You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/26 21:03:55 UTC

[streampipes] branch 1158-replace-consul-client-library-with-consul-api-library updated: Replace consul-client with consul-api library (#1158)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1158-replace-consul-client-library-with-consul-api-library
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1158-replace-consul-client-library-with-consul-api-library by this push:
     new a84cd474b Replace consul-client with consul-api library (#1158)
a84cd474b is described below

commit a84cd474b2da2f345bbd0940b0ba30ca1ee082b6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 26 22:03:31 2023 +0100

    Replace consul-client with consul-api library (#1158)
---
 .../deploy/standalone/consul/docker-compose.yml    |   6 +-
 ...ive-data-from-the-streampipes-data-stream.ipynb |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |  49 +++++-----
 .../commons/environment/DefaultEnvironment.java    |  47 +++++++++
 .../commons/environment/Environment.java           |  17 ++--
 .../commons/environment/Environments.java          |  16 +---
 .../variable/BooleanEnvironmentVariable.java       |  16 ++--
 .../commons/environment/variable/EnvResolver.java  |  14 +--
 .../environment/variable/EnvironmentVariable.java  |  62 ++++++++++++
 .../variable/IntEnvironmentVariable.java           |  17 ++--
 .../variable/StringEnvironmentVariable.java        |  16 ++--
 .../management/config/ConfigExtractor.java         |   4 +-
 streampipes-integration-tests/pom.xml              |  11 +++
 .../svcdiscovery/AbstractConsulTest.java           |  74 +++++++++++++++
 .../svcdiscovery/ConsulKvManagementTest.java       |  63 +++++++++++++
 .../svcdiscovery/ConsulSpConfigTest.java           |  71 ++++++++++++++
 .../svcdiscovery/ConsulSpServiceDiscoveryTest.java | 105 +++++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  34 +++++++
 streampipes-sdk/pom.xml                            |   6 +-
 .../svcdiscovery/api/ISpKvManagement.java          |   2 -
 .../svcdiscovery/api/model/ConfigItem.java         |  24 +++++
 streampipes-service-discovery-consul/pom.xml       |   9 +-
 .../svcdiscovery/consul/AbstractConsulService.java |  14 +--
 .../consul/ConsulHealthServiceManager.java         |  82 ++++++----------
 .../svcdiscovery/consul/ConsulProvider.java        |  79 ++++++++--------
 .../svcdiscovery/consul/ConsulSpConfig.java        |  63 ++++++-------
 ...bstractConsulService.java => ServiceCache.java} |  27 ++++--
 .../svcdiscovery/consul/SpConsulKvManagement.java  |  55 ++++-------
 .../consul/SpConsulServiceDiscovery.java           |  95 +++++++++++--------
 .../svcdiscovery/SpServiceDiscovery.java           |  21 ++++-
 streampipes-storage-couchdb/pom.xml                |   4 +
 31 files changed, 799 insertions(+), 306 deletions(-)

diff --git a/installer/cli/deploy/standalone/consul/docker-compose.yml b/installer/cli/deploy/standalone/consul/docker-compose.yml
index 49dca0e95..dd26e0af9 100644
--- a/installer/cli/deploy/standalone/consul/docker-compose.yml
+++ b/installer/cli/deploy/standalone/consul/docker-compose.yml
@@ -16,11 +16,13 @@
 version: "3.4"
 services:
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
-      - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
+      - "CONSUL_LOCAL_CONFIG={\"rpc_streaming\": false, \"disable_update_check\": true, \"rpc\": {\"enable_streaming\": false}, \"use_streaming_backend\": false}"
       - "CONSUL_BIND_INTERFACE=eth0"
       - "CONSUL_HTTP_ADDR=0.0.0.0"
+      - "CONSUL_RPC_ENABLE_STREAMING=false"
+      - "CONSUL_USE_STREAMING_BACKEND=false"
     entrypoint:
       - consul
       - agent
diff --git a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 3b69c065d..3f3a3c9c3 100644
--- a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -610,4 +610,4 @@
  },
  "nbformat": 4,
  "nbformat_minor": 2
-}
+}
\ No newline at end of file
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 9ad589bb6..546050058 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -19,35 +19,37 @@ package org.apache.streampipes.commons.constants;
 
 public enum Envs {
 
-  SP_HOST("SP_HOST"),
-  SP_PORT("SP_PORT"),
+  SP_HOST("SP_HOST", null),
+  SP_PORT("SP_PORT", null),
 
   @Deprecated(since = "0.90.0", forRemoval = true)
-  SP_CONSUL_LOCATION("CONSUL_LOCATION"),
+  SP_CONSUL_LOCATION("CONSUL_LOCATION", "consul"),
 
-  SP_CONSUL_HOST("SP_CONSUL_HOST"),
-  SP_CONSUL_PORT("SP_CONSUL_PORT"),
-  SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS"),
-  SP_JWT_SECRET("JWT_SECRET"),
-  SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"),
-  SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"),
-  SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC"),
-  SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL"),
-  SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD"),
-  SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER"),
-  SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET"),
-  SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS"),
-  SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE"),
-  SP_CLIENT_USER("SP_CLIENT_USER"),
-  SP_CLIENT_SECRET("SP_CLIENT_SECRET"),
-  SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE"),
-  SP_DEBUG("SP_DEBUG"),
-  SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN");
+  SP_CONSUL_HOST("SP_CONSUL_HOST", "consul"),
+  SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"),
+  SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null),
+  SP_JWT_SECRET("JWT_SECRET", null),
+  SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null),
+  SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null),
+  SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null),
+  SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null),
+  SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null),
+  SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null),
+  SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null),
+  SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null),
+  SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null),
+  SP_CLIENT_USER("SP_CLIENT_USER", null),
+  SP_CLIENT_SECRET("SP_CLIENT_SECRET", null),
+  SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null),
+  SP_DEBUG("SP_DEBUG", "false"),
+  SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null);
 
   private final String envVariableName;
+  private final String defaultValue;
 
-  Envs(String envVariableName) {
+  Envs(String envVariableName, String defaultValue) {
     this.envVariableName = envVariableName;
+    this.defaultValue = defaultValue;
   }
 
   public boolean exists() {
@@ -82,4 +84,7 @@ public enum Envs {
     return this.exists() ? this.getValue() : defaultValue;
   }
 
+  public String getDefaultValue() {
+    return defaultValue;
+  }
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
new file mode 100644
index 000000000..0c051ef16
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment;
+
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+
+public class DefaultEnvironment implements Environment {
+
+  @Override
+  public StringEnvironmentVariable getConsulHost() {
+    return new StringEnvironmentVariable(Envs.SP_CONSUL_HOST);
+  }
+
+  @Override
+  public IntEnvironmentVariable getConsulPort() {
+    return new IntEnvironmentVariable(Envs.SP_CONSUL_PORT);
+  }
+
+  @Override
+  public BooleanEnvironmentVariable getSpDebug() {
+    return new BooleanEnvironmentVariable(Envs.SP_DEBUG);
+  }
+
+  @Override
+  public StringEnvironmentVariable getConsulLocation() {
+    return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
+  }
+}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
similarity index 58%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 6c7583df1..b3a3256d2 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -15,18 +15,21 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
 
-public interface ISpKvManagement {
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
 
-  <T> T getValueForRoute(String route, Class<T> type);
+public interface Environment {
 
-  Map<String, String> getKeyValue(String route);
+  StringEnvironmentVariable getConsulHost();
 
-  void updateConfig(String key, String entry, boolean password);
+  IntEnvironmentVariable getConsulPort();
 
-  void deleteConfig(String key);
+  BooleanEnvironmentVariable getSpDebug();
 
+  @Deprecated(since = "0.90.0", forRemoval = true)
+  StringEnvironmentVariable getConsulLocation();
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
index 6c7583df1..631e72dcc 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
@@ -15,18 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
 
-public interface ISpKvManagement {
-
-  <T> T getValueForRoute(String route, Class<T> type);
-
-  Map<String, String> getKeyValue(String route);
-
-  void updateConfig(String key, String entry, boolean password);
-
-  void deleteConfig(String key);
+public class Environments {
 
+  public static Environment getEnvironment() {
+    return new DefaultEnvironment();
+  }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
similarity index 68%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
index aff0f18c3..2c7b3e064 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
+public class BooleanEnvironmentVariable extends EnvironmentVariable<Boolean> {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public BooleanEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public Boolean parse(String value) {
+    return Boolean.parseBoolean(value.toLowerCase());
   }
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
index 6c7583df1..591f0d0f4 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
@@ -15,18 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment.variable;
 
-public interface ISpKvManagement {
-
-  <T> T getValueForRoute(String route, Class<T> type);
-
-  Map<String, String> getKeyValue(String route);
-
-  void updateConfig(String key, String entry, boolean password);
-
-  void deleteConfig(String key);
+public interface EnvResolver<T> {
 
+  T resolve();
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
new file mode 100644
index 000000000..769d5fea9
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment.variable;
+
+import org.apache.streampipes.commons.constants.CustomEnvs;
+import org.apache.streampipes.commons.constants.Envs;
+
+public abstract class EnvironmentVariable<T> {
+
+  private T defaultValue;
+  private String envVariableName;
+
+  public EnvironmentVariable(String envVariableName,
+                             T defaultValue) {
+    this.envVariableName = envVariableName;
+    this.defaultValue = defaultValue;
+  }
+
+  public EnvironmentVariable(Envs envVariable) {
+    this.envVariableName = envVariable.getEnvVariableName();
+    this.defaultValue = parse(envVariable.getDefaultValue());
+  }
+
+  public T getValue() {
+    return parse(CustomEnvs.getEnv(envVariableName));
+  }
+
+  public boolean exists() {
+    return CustomEnvs.exists(envVariableName);
+  }
+
+  public T getValueOrDefault() {
+    return exists() ? getValue() : defaultValue;
+  }
+
+  public T getValueOrReturn(T defaultValue) {
+    return exists() ? getValue() : defaultValue;
+  }
+
+  public T getValueOrResolve(EnvResolver<T> resolver) {
+    return resolver.resolve();
+  }
+
+  public abstract T parse(String value);
+
+}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
similarity index 69%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
index aff0f18c3..6bfb35c2d 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
-
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+public class IntEnvironmentVariable extends EnvironmentVariable<Integer> {
+  public IntEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public Integer parse(String value) {
+    return Integer.parseInt(value);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
similarity index 70%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
index aff0f18c3..eaf891b4b 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
+public class StringEnvironmentVariable extends EnvironmentVariable<String> {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public StringEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public String parse(String value) {
+    return value;
   }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
index 89bc9a700..5c00462a4 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
@@ -17,8 +17,8 @@
  */
 package org.apache.streampipes.extensions.management.config;
 
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.svcdiscovery.consul.ConsulSpConfig;
 
 import java.io.Serializable;
 
@@ -27,7 +27,7 @@ public class ConfigExtractor implements Serializable {
   private SpConfig config;
 
   private ConfigExtractor(String serviceGroup) {
-    this.config = new ConsulSpConfig(serviceGroup);
+    this.config = SpServiceDiscovery.getSpConfig(serviceGroup);
   }
 
   public static ConfigExtractor from(String serviceGroup) {
diff --git a/streampipes-integration-tests/pom.xml b/streampipes-integration-tests/pom.xml
index d576bb508..c4a2588ec 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -56,12 +56,23 @@
             <classifier>embed</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-service-discovery</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <version>1.17.4</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>consul</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
new file mode 100644
index 000000000..367da32bd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import org.mockito.Mockito;
+import org.testcontainers.consul.ConsulContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.mockito.Mockito.mock;
+
+public class AbstractConsulTest {
+
+  static final ConsulContainer consulContainer;
+
+  static {
+    consulContainer = new ConsulContainer(
+        DockerImageName.parse("consul").withTag("1.14.3"))
+        .withExposedPorts(8500);
+    consulContainer.start();
+  }
+
+  protected ConfigItem makeConfigItem(String key, String value) {
+    var item = new ConfigItem();
+    item.setKey(key);
+    item.setValue(value);
+    item.setPassword(false);
+
+    return item;
+  }
+
+  protected Environment mockEnvironment() {
+    var envMock = mock(Environment.class);
+    var hostVariableMock = mock(StringEnvironmentVariable.class);
+    var portVariableMock = mock(IntEnvironmentVariable.class);
+    var consulLocationMock = mock(StringEnvironmentVariable.class);
+    var spDebugMock = mock(BooleanEnvironmentVariable.class);
+
+    Mockito.when(hostVariableMock.getValueOrDefault()).thenReturn(consulContainer.getHost());
+    Mockito.when(portVariableMock.getValueOrDefault()).thenReturn(consulContainer.getMappedPort(8500));
+    Mockito.when(consulLocationMock.exists()).thenReturn(false);
+    Mockito.when(spDebugMock.exists()).thenReturn(false);
+    Mockito.when(spDebugMock.getValueOrReturn(false)).thenReturn(false);
+
+    Mockito.when(envMock.getConsulHost()).thenReturn(hostVariableMock);
+    Mockito.when(envMock.getConsulPort()).thenReturn(portVariableMock);
+    Mockito.when(envMock.getConsulLocation()).thenReturn(consulLocationMock);
+    Mockito.when(envMock.getSpDebug()).thenReturn(spDebugMock);
+
+    return envMock;
+
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
new file mode 100644
index 000000000..5b14c0a63
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import com.google.gson.Gson;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulKvManagementTest extends AbstractConsulTest {
+
+  private ISpKvManagement kvManagement;
+
+  @Before
+  public void before() {
+    var environment = mockEnvironment();
+    this.kvManagement = SpServiceDiscovery
+        .getKeyValueStore(environment);
+  }
+
+  @Test
+  public void testKv() {
+    var test1Key = "a/test";
+    var test2Key = "a/test2";
+    var configItem1 = makeConfigItem(test1Key, "abc");
+    var configItem2 = makeConfigItem(test2Key, "def");
+    kvManagement.updateConfig(test1Key, serialize(configItem1), false);
+    kvManagement.updateConfig(test2Key, serialize(configItem2), false);
+
+    Map<String, String> result = kvManagement.getKeyValue("a");
+
+    assertEquals(2, result.size());
+    assertEquals(serialize(configItem1), result.get(test1Key));
+    assertEquals(serialize(configItem2), result.get(test2Key));
+  }
+
+  private String serialize(ConfigItem item) {
+    return new Gson().toJson(item);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
new file mode 100644
index 000000000..7b64d3d39
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulSpConfigTest extends AbstractConsulTest {
+
+  private SpConfig spConfig;
+  @Before
+  public void before() {
+    var env = mockEnvironment();
+    this.spConfig = SpServiceDiscovery
+        .getSpConfig("test-service-group", env);
+  }
+
+  @Test
+  public void testKvString() {
+
+    spConfig.register("str", "abc", "desc");
+    var value = spConfig.getString("str");
+    assertEquals("abc", value);
+  }
+
+  @Test
+  public void testKvInteger() {
+    spConfig.register("int", 2, "desc");
+    var value = spConfig.getInteger("int");
+    assertEquals(2, value);
+  }
+
+  @Test
+  public void testKvConfigItem() {
+    var configItem = makeConfigItem("config", "value");
+    spConfig.register(configItem);
+    var value = spConfig.getConfigItem("config");
+    assertEquals(configItem, value);
+  }
+
+  @Test
+  public void testUpdateValue() {
+    spConfig.register("stru", "abc", "desc");
+    var value = spConfig.getString("stru");
+    assertEquals("abc", value);
+    spConfig.setString("stru", "abc2");
+    var result = spConfig.getString("stru");
+    assertEquals("abc2", result);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
new file mode 100644
index 000000000..0a4093c30
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConsulSpServiceDiscoveryTest extends AbstractConsulTest {
+
+  private ISpServiceDiscovery serviceDiscovery;
+
+  @Before
+  public void before() {
+    var env = mockEnvironment();
+    this.serviceDiscovery = SpServiceDiscovery
+        .getServiceDiscovery(env);
+  }
+
+  @Test
+  public void testServiceRegistration() {
+
+    serviceDiscovery.registerService(makeRequest(makeServiceTag(), "abc"));
+
+    var endpoints = getEndpoints(false);
+    assertEquals(1, endpoints.size());
+
+    serviceDiscovery.deregisterService("abc");
+
+    endpoints = getEndpoints(false);
+    assertEquals(0, endpoints.size());
+  }
+
+  @Test
+  public void testGetExtensionsServiceGroups() {
+    serviceDiscovery.registerService(makeRequest(List.of(DefaultSpServiceTags.PE, groupTag()), "abc"));
+    serviceDiscovery.registerService(makeRequest(
+        List.of(DefaultSpServiceTags.CORE, groupTag()), "def"));
+
+    var serviceGroups = serviceDiscovery.getExtensionsServiceGroups();
+
+    assertEquals(1, serviceGroups.size());
+    assertTrue(serviceGroups.containsKey("my-group"));
+
+    serviceDiscovery.deregisterService("abc");
+    serviceDiscovery.deregisterService("def");
+  }
+
+  private List<String> getEndpoints(boolean healthy) {
+    return serviceDiscovery.getServiceEndpoints(
+        DefaultSpServiceGroups.EXT,
+        healthy,
+        List.of(makeServiceTag().get(0).asString()));
+  }
+
+  private SpServiceRegistrationRequest makeRequest(List<SpServiceTag> serviceTags,
+                                                   String serviceId) {
+    var req = new SpServiceRegistrationRequest(
+        DefaultSpServiceGroups.EXT,
+        serviceId,
+        "localhost",
+        80,
+        serviceTags,
+        "/"
+    );
+
+    return req;
+  }
+
+  private SpServiceTag groupTag() {
+    return SpServiceTag.create(SpServiceTagPrefix.SP_GROUP, "my-group");
+  }
+
+  private List<SpServiceTag> makeServiceTag() {
+    return List.of(DefaultSpServiceTags.PE);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/resources/logback-test.xml b/streampipes-integration-tests/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..d47f8f1cd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<configuration debug="false" scan="true">
+
+    <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT" />
+    </root>
+
+</configuration>
diff --git a/streampipes-sdk/pom.xml b/streampipes-sdk/pom.xml
index 0f7b7aaf3..889aa9e5d 100644
--- a/streampipes-sdk/pom.xml
+++ b/streampipes-sdk/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
 
         <!-- External dependencies -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.github.drapostolos</groupId>
             <artifactId>type-parser</artifactId>
@@ -71,4 +75,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
index 6c7583df1..4b859f3c6 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
@@ -21,8 +21,6 @@ import java.util.Map;
 
 public interface ISpKvManagement {
 
-  <T> T getValueForRoute(String route, Class<T> type);
-
   Map<String, String> getKeyValue(String route);
 
   void updateConfig(String key, String entry, boolean password);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
index c3dcc0415..803f4e43b 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.svcdiscovery.api.model;
 
+import java.util.Objects;
+
 public class ConfigItem {
 
   private String key;
@@ -117,4 +119,26 @@ public class ConfigItem {
   public void setConfigurationScope(ConfigurationScope configurationScope) {
     this.configurationScope = configurationScope;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConfigItem that = (ConfigItem) o;
+    return isPassword == that.isPassword
+        && Objects.equals(key, that.key)
+        && Objects.equals(description, that.description)
+        && Objects.equals(value, that.value)
+        && Objects.equals(valueType, that.valueType)
+        && configurationScope == that.configurationScope;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, description, value, valueType, configurationScope, isPassword);
+  }
 }
diff --git a/streampipes-service-discovery-consul/pom.xml b/streampipes-service-discovery-consul/pom.xml
index b1cedc09e..a10cad9ef 100644
--- a/streampipes-service-discovery-consul/pom.xml
+++ b/streampipes-service-discovery-consul/pom.xml
@@ -46,8 +46,13 @@
 
         <!-- third-party dependencies -->
         <dependency>
-            <groupId>com.orbitz.consul</groupId>
-            <artifactId>consul-client</artifactId>
+            <groupId>com.ecwid.consul</groupId>
+            <artifactId>consul-api</artifactId>
+            <version>1.4.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
index aff0f18c3..d89b387ac 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
@@ -17,17 +17,19 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+import org.apache.streampipes.commons.environment.Environment;
+
+import com.ecwid.consul.v1.ConsulClient;
 
 public abstract class AbstractConsulService {
 
-  protected ConsulHealthServiceManager consul;
+  private final Environment environment;
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public AbstractConsulService(Environment environment) {
+    this.environment = environment;
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  public ConsulClient consulInstance() {
+    return ConsulProvider.INSTANCE.getConsulInstance(environment);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
index d8d2d642b..cb1710483 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
@@ -17,90 +17,64 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.commons.environment.Environment;
 
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.HealthClient;
-import com.orbitz.consul.cache.ServiceHealthCache;
-import com.orbitz.consul.model.health.ServiceHealth;
-import com.orbitz.consul.option.QueryOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.Check;
+import com.ecwid.consul.v1.health.model.HealthService;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public enum ConsulHealthServiceManager {
+public class ConsulHealthServiceManager extends AbstractConsulService {
 
-  INSTANCE;
-
-  private static final Logger LOG = LoggerFactory.getLogger(ConsulHealthServiceManager.class);
   private static final int MAX_RETRIES = 3;
-  private final Consul client;
-  private final Map<String, ServiceHealthCache> serviceHealthCaches;
 
-  ConsulHealthServiceManager() {
-    serviceHealthCaches = new HashMap<>();
-    client = new ConsulProvider().consulInstance();
-    initializeAll();
+  public ConsulHealthServiceManager(Environment environment) {
+    super(environment);
+    initializeCache();
   }
 
-  public void initializeAll() {
-    initialize(DefaultSpServiceGroups.CORE);
-    initialize(DefaultSpServiceGroups.EXT);
-  }
+  public void initializeCache() {
 
-  public void initialize(String serviceGroup) {
-    HealthClient healthClient = client.healthClient();
-    ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, serviceGroup, false, 9, QueryOptions.BLANK);
-    svHealth.start();
-    try {
-      svHealth.awaitInitialized(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    serviceHealthCaches.put(serviceGroup, svHealth);
   }
 
   public List<String> getServiceEndpoints(String serviceGroup,
                                           boolean restrictToHealthy,
                                           List<String> filterByTags) {
-    List<ServiceHealth> activeServices = findService(serviceGroup, 0);
+    List<HealthService> activeServices = findService(serviceGroup, 0);
 
     return activeServices
         .stream()
         .filter(service -> allFiltersSupported(service, filterByTags))
         .filter(service -> !restrictToHealthy
-            || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
+            || service.getChecks().stream().allMatch(check -> check.getStatus() == Check.CheckStatus.PASSING))
         .map(this::makeServiceUrl)
         .collect(Collectors.toList());
   }
 
-  private String makeServiceUrl(ServiceHealth service) {
+  private String makeServiceUrl(HealthService service) {
     return service.getService().getAddress() + ":" + service.getService().getPort();
   }
 
-  private boolean allFiltersSupported(ServiceHealth service,
+  private boolean allFiltersSupported(HealthService service,
                                       List<String> filterByTags) {
-    return service.getService().getTags().containsAll(filterByTags);
+    return new HashSet<>(service.getService().getTags()).containsAll(filterByTags);
   }
 
-  private List<ServiceHealth> findService(String serviceGroup, int retryCount) {
-
-    if (serviceHealthCaches.containsKey(serviceGroup)
-        && serviceHealthCaches.get(serviceGroup).getMap() != null) {
-      ServiceHealthCache cache = serviceHealthCaches.get(serviceGroup);
-      return cache
-          .getMap()
-          .values()
-          .stream()
-          .filter((value) -> value.getService().getService().equals(serviceGroup))
-          .collect(Collectors.toList());
-    } else {
+  private List<HealthService> findService(String serviceGroup, int retryCount) {
+    HealthServicesRequest request = HealthServicesRequest.newBuilder()
+        .setPassing(true)
+        .setQueryParams(QueryParams.DEFAULT)
+        .build();
+    Response<List<HealthService>> healthyServicesResp = consulInstance().getHealthServices(serviceGroup, request);
+    var healthyServices = healthyServicesResp.getValue();
+    if (healthyServices.size() == 0) {
       if (retryCount < MAX_RETRIES) {
         try {
           retryCount++;
@@ -113,10 +87,10 @@ public enum ConsulHealthServiceManager {
       } else {
         return Collections.emptyList();
       }
+    } else {
+      return healthyServices;
     }
   }
 
-  public Consul consulInstance() {
-    return this.client;
-  }
+
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index 69654eaba..cd7282d32 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -18,9 +18,9 @@
 package org.apache.streampipes.svcdiscovery.consul;
 
 import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
 
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.ConsulClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,47 +29,52 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.concurrent.TimeUnit;
 
-public class ConsulProvider {
+public enum ConsulProvider {
+
+  INSTANCE;
 
   private static final Logger LOG = LoggerFactory.getLogger(ConsulProvider.class);
   private static final int CHECK_INTERVAL = 1;
 
-  private final String consulHost;
-  private final String consulUrlString;
-  private final int consulPort;
+  private ConsulClient consulClient;
+  private boolean initialized = false;
 
-  public ConsulProvider() {
-    this.consulHost = getConsulHost();
-    this.consulPort = getConsulPort();
-    this.consulUrlString = makeConsulUrl();
+  ConsulProvider() {
   }
 
-  public Consul consulInstance() {
-    boolean connected;
+  public ConsulClient getConsulInstance(Environment environment) {
+    if (!initialized) {
+      createConsulInstance(environment);
+    }
 
-    do {
-      LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
-      connected = checkConsulAvailable();
+    return consulClient;
+  }
 
-      if (!connected) {
-        LOG.info("Retrying in {} second", CHECK_INTERVAL);
-        try {
-          TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    } while (!connected);
+  private void createConsulInstance(Environment environment) {
+    var consulHost = getConsulHost(environment);
+    var consulPort = getConsulPort(environment);
+    var connected = false;
 
-    LOG.info("Successfully connected to Consul");
-    return Consul.builder().withUrl(consulUrlString).build();
-  }
+    LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
+    connected = checkConsulAvailable(consulHost, consulPort);
 
-  public String makeConsulUrl() {
-    return "http://" + consulHost + ":" + consulPort;
+    if (connected) {
+      LOG.info("Successfully connected to Consul on host {}", consulHost);
+      this.consulClient = new ConsulClient(consulHost, consulPort);
+      this.initialized = true;
+    } else {
+      LOG.info("Retrying in {} second", CHECK_INTERVAL);
+      try {
+        TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
+        createConsulInstance(environment);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
   }
 
-  private boolean checkConsulAvailable() {
+  private boolean checkConsulAvailable(String consulHost,
+                                       int consulPort) {
     try {
       InetSocketAddress sa = new InetSocketAddress(consulHost, consulPort);
       Socket ss = new Socket();
@@ -83,18 +88,18 @@ public class ConsulProvider {
     }
   }
 
-  private int getConsulPort() {
-    return Envs.SP_CONSUL_PORT.getValueAsIntOrDefault(DefaultEnvValues.CONSUL_PORT_DEFAULT);
+  private int getConsulPort(Environment environment) {
+    return environment.getConsulPort().getValueOrDefault();
   }
 
-  private String getConsulHost() {
-    if (Envs.SP_CONSUL_LOCATION.exists()) {
-      return Envs.SP_CONSUL_LOCATION.getValue();
+  private String getConsulHost(Environment environment) {
+    if (environment.getConsulLocation().exists()) {
+      return environment.getConsulLocation().getValue();
     } else {
-      if (Envs.SP_DEBUG.getValueAsBooleanOrDefault(false)) {
+      if (environment.getSpDebug().getValueOrReturn(false)) {
         return DefaultEnvValues.CONSUL_HOST_LOCAL;
       } else {
-        return Envs.SP_CONSUL_HOST.getValueOrDefault(DefaultEnvValues.CONSUL_HOST_DEFAULT);
+        return environment.getConsulHost().getValueOrDefault();
       }
     }
   }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index 9f3f6ddec..73770eeea 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -18,35 +18,30 @@
 
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItemUtils;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
 
+import com.ecwid.consul.v1.ConsulClient;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Optional;
-
 public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
   private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
   private static final String SLASH = "/";
   private final String serviceName;
-  private final KeyValueClient kvClient;
-
-  // TODO Implement mechanism to update the client when some configuration parameters change in Consul
-  private Map<String, Object> configProps;
+  private final ConsulClient kvClient;
 
-  public ConsulSpConfig(String serviceName) {
-    Consul consul = consulInstance();
-    this.kvClient = consul.keyValueClient();
+  public ConsulSpConfig(String serviceName,
+                        Environment environment) {
+    super(environment);
+    this.kvClient = consulInstance();
     this.serviceName = serviceName;
   }
 
@@ -82,9 +77,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public void registerObject(String key, Object defaultValue, String description) {
-    Optional<String> i = kvClient.getValueAsString(addSn(key));
-    if (!i.isPresent()) {
-      kvClient.putValue(addSn(key), toJson(defaultValue));
+    var i = kvClient.getKVValue(addSn(key));
+    if (i.getValue() == null) {
+      kvClient.setKVValue(addSn(key), toJson(defaultValue));
     }
   }
 
@@ -96,28 +91,28 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
   @Override
   public void register(ConfigItem configItem) {
     String key = addSn(configItem.getKey());
-    Optional<String> i = kvClient.getValueAsString(key);
+    var i = kvClient.getKVValue(key);
 
-    if (!i.isPresent()) {
+    if (i.getValue() == null) {
       // Set the value of environment variable as default
       String envVariable = System.getenv(configItem.getKey());
       if (envVariable != null) {
         configItem.setValue(envVariable);
-        kvClient.putValue(key, toJson(configItem));
+        kvClient.setKVValue(key, toJson(configItem));
       } else {
-        kvClient.putValue(key, toJson(configItem));
+        kvClient.setKVValue(key, toJson(configItem));
       }
     }
   }
 
-  private void register(String key, String defaultValue, String valueType, String description,
-                        ConfigurationScope configurationScope, boolean isPassword) {
+  private void register(String key,
+                        String defaultValue,
+                        String valueType,
+                        String description,
+                        ConfigurationScope configurationScope,
+                        boolean isPassword) {
     ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
     register(configItem);
-
-    if (configProps != null) {
-      configProps.put(key, getString(key));
-    }
   }
 
   @Override
@@ -142,10 +137,10 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
-    Optional<String> os = kvClient.getValueAsString(addSn(key));
-    if (os.isPresent()) {
+    var os = kvClient.getKVValue(addSn(key));
+    if (os.getValue() != null) {
       try {
-        return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
+        return JacksonSerializer.getObjectMapper().readValue(os.getValue().getDecodedValue(), clazz);
       } catch (JsonProcessingException e) {
         LOG.info("Could not deserialize object", e);
         return defaultValue;
@@ -157,9 +152,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public ConfigItem getConfigItem(String key) {
-    Optional<String> os = kvClient.getValueAsString(addSn(key));
+    var os = kvClient.getKVValue(addSn(key)).getValue();
 
-    return fromJson(os.get());
+    return fromJson(os.getDecodedValue());
   }
 
   @Override
@@ -179,12 +174,12 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public void setString(String key, String value) {
-    kvClient.putValue(addSn(key), value);
+    kvClient.setKVValue(addSn(key), value);
   }
 
   @Override
   public void setObject(String key, Object value) {
-    kvClient.putValue(addSn(key), toJson(value));
+    kvClient.setKVValue(addSn(key), toJson(value));
   }
 
   private String addSn(String key) {
@@ -202,7 +197,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
     }
   }
 
-  private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope,
+  private ConfigItem prepareConfigItem(String valueType,
+                                       String description,
+                                       ConfigurationScope configurationScope,
                                        boolean password) {
     ConfigItem configItem = new ConfigItem();
     configItem.setValueType(valueType);
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
similarity index 61%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
index aff0f18c3..1020553cd 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
@@ -15,19 +15,30 @@
  * limitations under the License.
  *
  */
+
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 
-public abstract class AbstractConsulService {
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-  protected ConsulHealthServiceManager consul;
+public enum ServiceCache {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
-  }
+  INSTANCE;
+
+  private LoadingCache<String, List<HealthService>> cache;
+
+  ServiceCache() {
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
   }
+
+
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
index 0138114a5..31247feca 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
@@ -17,15 +17,11 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
-import com.orbitz.consul.model.ConsulResponse;
-import com.orbitz.consul.model.kv.Value;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,47 +29,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagement {
+public class SpConsulKvManagement extends AbstractConsulService implements ISpKvManagement {
 
   private static final Logger LOG = LoggerFactory.getLogger(SpConsulKvManagement.class);
 
   private static final String CONSUL_NAMESPACE = "/sp/v1/";
 
-  public <T> T getValueForRoute(String route, Class<T> type) {
-    try {
-      String entry = getKeyValue(route)
-          .values()
-          .stream()
-          .findFirst()
-          .orElse(null);
-
-      if (type.equals(Integer.class)) {
-        return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      } else if (type.equals(Boolean.class)) {
-        return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      } else {
-        return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      }
-    } catch (JsonProcessingException e) {
-      e.printStackTrace();
-    }
-    throw new IllegalArgumentException("Cannot get entry from Consul");
+  public SpConsulKvManagement(Environment environment) {
+    super(environment);
   }
 
   public Map<String, String> getKeyValue(String route) {
-    Consul consul = consulInstance();
-    KeyValueClient keyValueClient = consul.keyValueClient();
+    var consul = consulInstance();
 
     Map<String, String> keyValues = new HashMap<>();
+    Response<List<GetValue>> consulResponseWithValues = consul.getKVValues(route);
 
-    ConsulResponse<List<Value>> consulResponseWithValues = keyValueClient.getConsulResponseWithValues(route);
-
-    if (consulResponseWithValues.getResponse() != null) {
-      for (Value value : consulResponseWithValues.getResponse()) {
+    if (consulResponseWithValues.getValue() != null) {
+      for (GetValue value : consulResponseWithValues.getValue()) {
         String key = value.getKey();
         String v = "";
-        if (value.getValueAsString().isPresent()) {
-          v = value.getValueAsString().get();
+        if (value.getValue() != null) {
+          v = value.getDecodedValue();
         }
         keyValues.put(key, v);
       }
@@ -82,16 +59,16 @@ public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagem
   }
 
   public void updateConfig(String key, String entry, boolean password) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     if (!password) {
       LOG.info("Updated config - key:" + key + " value: " + entry);
-      consul.keyValueClient().putValue(key, entry);
+      consul.setKVValue(key, entry);
     }
   }
 
   public void deleteConfig(String key) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     LOG.info("Delete config: {}", key);
-    consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
+    consul.deleteKVValue(CONSUL_NAMESPACE + key);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 8156e2545..a00acf575 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
@@ -24,13 +25,10 @@ import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationReques
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
-import com.orbitz.consul.AgentClient;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.model.agent.ImmutableRegCheck;
-import com.orbitz.consul.model.agent.ImmutableRegistration;
-import com.orbitz.consul.model.agent.Registration;
-import com.orbitz.consul.model.health.HealthCheck;
-import com.orbitz.consul.model.health.Service;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.Check;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.agent.model.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +47,16 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   private static final String COLON = ":";
   private static final String HEALTH_CHECK_INTERVAL = "10s";
 
+  private ConsulHealthServiceManager healthServiceManager;
+
+  public SpConsulServiceDiscovery(Environment environment) {
+    super(environment);
+    this.healthServiceManager = new ConsulHealthServiceManager(environment);
+  }
+
   @Override
   public void registerService(SpServiceRegistrationRequest req) {
-    consulInstance().agentClient().register((createRegistrationBody(req)));
+    consulInstance().agentServiceRegister(createRegistrationBody(req));
     LOG.info("Successfully registered service at Consul: " + req.getSvcId());
   }
 
@@ -75,30 +80,32 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
 
   @Override
   public List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy, List<String> filterByTags) {
-    return ConsulHealthServiceManager.INSTANCE.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
+    return healthServiceManager.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
   }
 
   @Override
   public Map<String, String> getExtensionsServiceGroups() {
     LOG.info("Load pipeline element service status");
-    Consul consul = consulInstance();
-    AgentClient agent = consul.agentClient();
+    var consul = consulInstance();
 
-    Map<String, Service> services = consul.agentClient().getServices();
-    Map<String, HealthCheck> checks = agent.getChecks();
+    Response<Map<String, Service>> servicesResp = consul.getAgentServices();
+    Response<Map<String, Check>> checksResp = consul.getAgentChecks();
 
     Map<String, String> peSvcs = new HashMap<>();
-
-    for (Map.Entry<String, Service> entry : services.entrySet()) {
-      if (hasExtensionsTag(entry.getValue().getTags())) {
-        String serviceId = entry.getValue().getId();
-        String serviceStatus = "critical";
-        if (checks.containsKey("service:" + entry.getKey())) {
-          serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
+    if (servicesResp.getValue() != null) {
+      var services = servicesResp.getValue();
+      var checks = checksResp.getValue();
+      for (Map.Entry<String, Service> entry : services.entrySet()) {
+        if (hasExtensionsTag(entry.getValue().getTags())) {
+          String serviceId = entry.getValue().getId();
+          String serviceStatus = "critical";
+          if (checks.containsKey("service:" + entry.getKey())) {
+            serviceStatus = checks.get("service:" + entry.getKey()).getStatus().name();
+          }
+          LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
+          String serviceGroup = extractServiceGroup(entry.getValue().getTags());
+          peSvcs.put(serviceGroup, serviceStatus);
         }
-        LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
-        String serviceGroup = extractServiceGroup(entry.getValue().getTags());
-        peSvcs.put(serviceGroup, serviceStatus);
       }
     }
     return peSvcs;
@@ -117,25 +124,33 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
 
   @Override
   public void deregisterService(String svcId) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     LOG.info("Deregister service: " + svcId);
-    consul.agentClient().deregister(svcId);
+    consul.agentServiceDeregister(svcId);
+  }
+
+  private NewService createRegistrationBody(SpServiceRegistrationRequest req) {
+    var service = new NewService();
+    service.setId(req.getSvcId());
+    service.setName(req.getSvcGroup());
+    service.setPort(req.getPort());
+    service.setAddress(HTTP_PROTOCOL + req.getHost());
+    service.setCheck(createServiceCheck(req));
+
+    service.setTags(asString(req.getTags()));
+    service.setEnableTagOverride(true);
+
+    return service;
   }
 
-  private Registration createRegistrationBody(SpServiceRegistrationRequest req) {
-    return ImmutableRegistration.builder()
-        .id(req.getSvcId())
-        .name(req.getSvcGroup())
-        .port(req.getPort())
-        .address(HTTP_PROTOCOL + req.getHost())
-        .check(ImmutableRegCheck.builder()
-            .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
-            .interval(HEALTH_CHECK_INTERVAL)
-            .deregisterCriticalServiceAfter("120s")
-            .status("passing")
-            .build())
-        .tags(asString(req.getTags()))
-        .enableTagOverride(true)
-        .build();
+  private NewService.Check createServiceCheck(SpServiceRegistrationRequest req) {
+    var serviceCheck = new NewService.Check();
+
+    serviceCheck.setHttp(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath());
+    serviceCheck.setInterval(HEALTH_CHECK_INTERVAL);
+    serviceCheck.setDeregisterCriticalServiceAfter("120s");
+    serviceCheck.setStatus("passing");
+
+    return serviceCheck;
   }
 }
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
index 8f836b077..cead1334b 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.svcdiscovery;
 
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
@@ -27,15 +29,28 @@ import org.apache.streampipes.svcdiscovery.consul.SpConsulServiceDiscovery;
 public class SpServiceDiscovery {
 
   public static ISpServiceDiscovery getServiceDiscovery() {
-    return new SpConsulServiceDiscovery();
+    return new SpConsulServiceDiscovery(Environments.getEnvironment());
+  }
+
+  public static ISpServiceDiscovery getServiceDiscovery(Environment environment) {
+    return new SpConsulServiceDiscovery(environment);
   }
 
   public static ISpKvManagement getKeyValueStore() {
-    return new SpConsulKvManagement();
+    return new SpConsulKvManagement(Environments.getEnvironment());
+  }
+
+  public static ISpKvManagement getKeyValueStore(Environment environment) {
+    return new SpConsulKvManagement(environment);
   }
 
   public static SpConfig getSpConfig(String serviceGroup) {
-    return new ConsulSpConfig(serviceGroup);
+    return getSpConfig(serviceGroup, Environments.getEnvironment());
+  }
+
+  public static SpConfig getSpConfig(String serviceGroup,
+                                     Environment environment) {
+    return new ConsulSpConfig(serviceGroup, environment);
   }
 
 }
diff --git a/streampipes-storage-couchdb/pom.xml b/streampipes-storage-couchdb/pom.xml
index 4a0c28cdb..f0346cd60 100644
--- a/streampipes-storage-couchdb/pom.xml
+++ b/streampipes-storage-couchdb/pom.xml
@@ -56,6 +56,10 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.lightcouch</groupId>
             <artifactId>lightcouch</artifactId>