You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/27 21:08:28 UTC

[streampipes] branch dev updated (3abfd88fe -> 00a043c3f)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


    from 3abfd88fe Merge pull request #1166 from SteveYurongSu/iotdb-session-client
     new e02d3bf5e Replace consul-client with consul-api library (#1158)
     new bac4d45e6 Remove obsolete class (#1158)
     new 19f525019 Fix checkstyle (#1158)
     new ca9d9b33b Bump consul version to 1.14.3 (#1157)
     new 00a043c3f [hotfix] Update Consul version in root docker-compose.yml

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker-compose.yml                                 |   6 +-
 .../deploy/standalone/consul/docker-compose.yml    |   6 +-
 installer/compose/docker-compose.full.yml          |   2 +-
 installer/compose/docker-compose.nats.yml          |   2 +-
 installer/compose/docker-compose.yml               |   2 +-
 .../external/consul/consul-deployment.yaml         |   2 +-
 installer/k8s/values.yaml                          |   2 +-
 ...ive-data-from-the-streampipes-data-stream.ipynb |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |  49 +++++-----
 .../commons/environment/DefaultEnvironment.java    |  37 ++++----
 .../commons/environment/Environment.java           |  26 ++---
 .../commons/environment/Environments.java          |  11 ++-
 .../variable/BooleanEnvironmentVariable.java}      |  13 ++-
 .../variable/EnvResolver.java}                     |   5 +-
 .../environment/variable/EnvironmentVariable.java  |  62 ++++++++++++
 .../variable/IntEnvironmentVariable.java}          |  10 +-
 .../variable/StringEnvironmentVariable.java}       |  11 ++-
 .../management/config/ConfigExtractor.java         |   4 +-
 streampipes-integration-tests/pom.xml              |  11 +++
 .../svcdiscovery/AbstractConsulTest.java           |  74 +++++++++++++++
 .../svcdiscovery/ConsulKvManagementTest.java       |  63 +++++++++++++
 .../svcdiscovery/ConsulSpConfigTest.java           |  71 ++++++++++++++
 .../svcdiscovery/ConsulSpServiceDiscoveryTest.java | 105 +++++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  20 ++--
 streampipes-sdk/pom.xml                            |   6 +-
 .../svcdiscovery/api/ISpKvManagement.java          |   2 -
 .../svcdiscovery/api/model/ConfigItem.java         |  24 +++++
 streampipes-service-discovery-consul/pom.xml       |   9 +-
 .../svcdiscovery/consul/AbstractConsulService.java |  14 +--
 .../consul/ConsulHealthServiceManager.java         |  82 ++++++----------
 .../svcdiscovery/consul/ConsulProvider.java        |  79 ++++++++--------
 .../svcdiscovery/consul/ConsulSpConfig.java        |  63 ++++++-------
 .../svcdiscovery/consul/SpConsulKvManagement.java  |  55 ++++-------
 .../consul/SpConsulServiceDiscovery.java           |  95 +++++++++++--------
 .../svcdiscovery/SpServiceDiscovery.java           |  21 ++++-
 streampipes-storage-couchdb/pom.xml                |   4 +
 36 files changed, 739 insertions(+), 311 deletions(-)
 copy streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java => streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java (50%)
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/StreamIdentifier.java => streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java (62%)
 copy streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/HavingClause.java => streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java (83%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/{parser/BooleanParser.java => environment/variable/BooleanEnvironmentVariable.java} (72%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/{exceptions/NoValidSepaTypeException.java => environment/variable/EnvResolver.java} (88%)
 create mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/{parser/IntegerParser.java => environment/variable/IntEnvironmentVariable.java} (76%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/{parser/StringParser.java => environment/variable/StringEnvironmentVariable.java} (75%)
 create mode 100644 streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
 create mode 100644 streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
 create mode 100644 streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
 create mode 100644 streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
 copy streampipes-service-base/src/main/resources/logback-spring.xml => streampipes-integration-tests/src/test/resources/logback-test.xml (67%)


[streampipes] 02/05: Remove obsolete class (#1158)

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit bac4d45e63a0013172394dd898760782289bade2
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 26 22:23:28 2023 +0100

    Remove obsolete class (#1158)
---
 .../svcdiscovery/consul/ServiceCache.java          | 44 ----------------------
 1 file changed, 44 deletions(-)

diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
deleted file mode 100644
index 1020553cd..000000000
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.svcdiscovery.consul;
-
-import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.health.HealthServicesRequest;
-import com.ecwid.consul.v1.health.model.HealthService;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public enum ServiceCache {
-
-  INSTANCE;
-
-  private LoadingCache<String, List<HealthService>> cache;
-
-  ServiceCache() {
-
-  }
-
-
-}


[streampipes] 03/05: Fix checkstyle (#1158)

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 19f52501985707e6200a977040a4aab40d2b654d
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 26 22:43:26 2023 +0100

    Fix checkstyle (#1158)
---
 .../integration/svcdiscovery/AbstractConsulTest.java           | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
index 367da32bd..76f3444f0 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
@@ -32,13 +32,13 @@ import static org.mockito.Mockito.mock;
 
 public class AbstractConsulTest {
 
-  static final ConsulContainer consulContainer;
+  static final ConsulContainer CONSUL_CONTAINER;
 
   static {
-    consulContainer = new ConsulContainer(
+    CONSUL_CONTAINER = new ConsulContainer(
         DockerImageName.parse("consul").withTag("1.14.3"))
         .withExposedPorts(8500);
-    consulContainer.start();
+    CONSUL_CONTAINER.start();
   }
 
   protected ConfigItem makeConfigItem(String key, String value) {
@@ -57,8 +57,8 @@ public class AbstractConsulTest {
     var consulLocationMock = mock(StringEnvironmentVariable.class);
     var spDebugMock = mock(BooleanEnvironmentVariable.class);
 
-    Mockito.when(hostVariableMock.getValueOrDefault()).thenReturn(consulContainer.getHost());
-    Mockito.when(portVariableMock.getValueOrDefault()).thenReturn(consulContainer.getMappedPort(8500));
+    Mockito.when(hostVariableMock.getValueOrDefault()).thenReturn(CONSUL_CONTAINER.getHost());
+    Mockito.when(portVariableMock.getValueOrDefault()).thenReturn(CONSUL_CONTAINER.getMappedPort(8500));
     Mockito.when(consulLocationMock.exists()).thenReturn(false);
     Mockito.when(spDebugMock.exists()).thenReturn(false);
     Mockito.when(spDebugMock.getValueOrReturn(false)).thenReturn(false);


[streampipes] 01/05: Replace consul-client with consul-api library (#1158)

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit e02d3bf5ea454e02f355e97599013cc637f7dc10
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 26 22:03:31 2023 +0100

    Replace consul-client with consul-api library (#1158)
---
 .../deploy/standalone/consul/docker-compose.yml    |   6 +-
 ...ive-data-from-the-streampipes-data-stream.ipynb |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |  49 +++++-----
 .../commons/environment/DefaultEnvironment.java    |  47 +++++++++
 .../commons/environment/Environment.java           |  17 ++--
 .../commons/environment/Environments.java          |  16 +---
 .../variable/BooleanEnvironmentVariable.java       |  16 ++--
 .../commons/environment/variable/EnvResolver.java  |  14 +--
 .../environment/variable/EnvironmentVariable.java  |  62 ++++++++++++
 .../variable/IntEnvironmentVariable.java           |  17 ++--
 .../variable/StringEnvironmentVariable.java        |  16 ++--
 .../management/config/ConfigExtractor.java         |   4 +-
 streampipes-integration-tests/pom.xml              |  11 +++
 .../svcdiscovery/AbstractConsulTest.java           |  74 +++++++++++++++
 .../svcdiscovery/ConsulKvManagementTest.java       |  63 +++++++++++++
 .../svcdiscovery/ConsulSpConfigTest.java           |  71 ++++++++++++++
 .../svcdiscovery/ConsulSpServiceDiscoveryTest.java | 105 +++++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  34 +++++++
 streampipes-sdk/pom.xml                            |   6 +-
 .../svcdiscovery/api/ISpKvManagement.java          |   2 -
 .../svcdiscovery/api/model/ConfigItem.java         |  24 +++++
 streampipes-service-discovery-consul/pom.xml       |   9 +-
 .../svcdiscovery/consul/AbstractConsulService.java |  14 +--
 .../consul/ConsulHealthServiceManager.java         |  82 ++++++----------
 .../svcdiscovery/consul/ConsulProvider.java        |  79 ++++++++--------
 .../svcdiscovery/consul/ConsulSpConfig.java        |  63 ++++++-------
 ...bstractConsulService.java => ServiceCache.java} |  27 ++++--
 .../svcdiscovery/consul/SpConsulKvManagement.java  |  55 ++++-------
 .../consul/SpConsulServiceDiscovery.java           |  95 +++++++++++--------
 .../svcdiscovery/SpServiceDiscovery.java           |  21 ++++-
 streampipes-storage-couchdb/pom.xml                |   4 +
 31 files changed, 799 insertions(+), 306 deletions(-)

diff --git a/installer/cli/deploy/standalone/consul/docker-compose.yml b/installer/cli/deploy/standalone/consul/docker-compose.yml
index 49dca0e95..dd26e0af9 100644
--- a/installer/cli/deploy/standalone/consul/docker-compose.yml
+++ b/installer/cli/deploy/standalone/consul/docker-compose.yml
@@ -16,11 +16,13 @@
 version: "3.4"
 services:
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
-      - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
+      - "CONSUL_LOCAL_CONFIG={\"rpc_streaming\": false, \"disable_update_check\": true, \"rpc\": {\"enable_streaming\": false}, \"use_streaming_backend\": false}"
       - "CONSUL_BIND_INTERFACE=eth0"
       - "CONSUL_HTTP_ADDR=0.0.0.0"
+      - "CONSUL_RPC_ENABLE_STREAMING=false"
+      - "CONSUL_USE_STREAMING_BACKEND=false"
     entrypoint:
       - consul
       - agent
diff --git a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 3b69c065d..3f3a3c9c3 100644
--- a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -610,4 +610,4 @@
  },
  "nbformat": 4,
  "nbformat_minor": 2
-}
+}
\ No newline at end of file
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 9ad589bb6..546050058 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -19,35 +19,37 @@ package org.apache.streampipes.commons.constants;
 
 public enum Envs {
 
-  SP_HOST("SP_HOST"),
-  SP_PORT("SP_PORT"),
+  SP_HOST("SP_HOST", null),
+  SP_PORT("SP_PORT", null),
 
   @Deprecated(since = "0.90.0", forRemoval = true)
-  SP_CONSUL_LOCATION("CONSUL_LOCATION"),
+  SP_CONSUL_LOCATION("CONSUL_LOCATION", "consul"),
 
-  SP_CONSUL_HOST("SP_CONSUL_HOST"),
-  SP_CONSUL_PORT("SP_CONSUL_PORT"),
-  SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS"),
-  SP_JWT_SECRET("JWT_SECRET"),
-  SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"),
-  SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"),
-  SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC"),
-  SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL"),
-  SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD"),
-  SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER"),
-  SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET"),
-  SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS"),
-  SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE"),
-  SP_CLIENT_USER("SP_CLIENT_USER"),
-  SP_CLIENT_SECRET("SP_CLIENT_SECRET"),
-  SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE"),
-  SP_DEBUG("SP_DEBUG"),
-  SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN");
+  SP_CONSUL_HOST("SP_CONSUL_HOST", "consul"),
+  SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"),
+  SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null),
+  SP_JWT_SECRET("JWT_SECRET", null),
+  SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null),
+  SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null),
+  SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null),
+  SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null),
+  SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null),
+  SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null),
+  SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null),
+  SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null),
+  SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null),
+  SP_CLIENT_USER("SP_CLIENT_USER", null),
+  SP_CLIENT_SECRET("SP_CLIENT_SECRET", null),
+  SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null),
+  SP_DEBUG("SP_DEBUG", "false"),
+  SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null);
 
   private final String envVariableName;
+  private final String defaultValue;
 
-  Envs(String envVariableName) {
+  Envs(String envVariableName, String defaultValue) {
     this.envVariableName = envVariableName;
+    this.defaultValue = defaultValue;
   }
 
   public boolean exists() {
@@ -82,4 +84,7 @@ public enum Envs {
     return this.exists() ? this.getValue() : defaultValue;
   }
 
+  public String getDefaultValue() {
+    return defaultValue;
+  }
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
new file mode 100644
index 000000000..0c051ef16
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment;
+
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+
+public class DefaultEnvironment implements Environment {
+
+  @Override
+  public StringEnvironmentVariable getConsulHost() {
+    return new StringEnvironmentVariable(Envs.SP_CONSUL_HOST);
+  }
+
+  @Override
+  public IntEnvironmentVariable getConsulPort() {
+    return new IntEnvironmentVariable(Envs.SP_CONSUL_PORT);
+  }
+
+  @Override
+  public BooleanEnvironmentVariable getSpDebug() {
+    return new BooleanEnvironmentVariable(Envs.SP_DEBUG);
+  }
+
+  @Override
+  public StringEnvironmentVariable getConsulLocation() {
+    return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
+  }
+}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
similarity index 58%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 6c7583df1..b3a3256d2 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -15,18 +15,21 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
 
-public interface ISpKvManagement {
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
 
-  <T> T getValueForRoute(String route, Class<T> type);
+public interface Environment {
 
-  Map<String, String> getKeyValue(String route);
+  StringEnvironmentVariable getConsulHost();
 
-  void updateConfig(String key, String entry, boolean password);
+  IntEnvironmentVariable getConsulPort();
 
-  void deleteConfig(String key);
+  BooleanEnvironmentVariable getSpDebug();
 
+  @Deprecated(since = "0.90.0", forRemoval = true)
+  StringEnvironmentVariable getConsulLocation();
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
index 6c7583df1..631e72dcc 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
@@ -15,18 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
 
-public interface ISpKvManagement {
-
-  <T> T getValueForRoute(String route, Class<T> type);
-
-  Map<String, String> getKeyValue(String route);
-
-  void updateConfig(String key, String entry, boolean password);
-
-  void deleteConfig(String key);
+public class Environments {
 
+  public static Environment getEnvironment() {
+    return new DefaultEnvironment();
+  }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
similarity index 68%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
index aff0f18c3..2c7b3e064 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
+public class BooleanEnvironmentVariable extends EnvironmentVariable<Boolean> {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public BooleanEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public Boolean parse(String value) {
+    return Boolean.parseBoolean(value.toLowerCase());
   }
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
index 6c7583df1..591f0d0f4 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
@@ -15,18 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api;
 
-import java.util.Map;
+package org.apache.streampipes.commons.environment.variable;
 
-public interface ISpKvManagement {
-
-  <T> T getValueForRoute(String route, Class<T> type);
-
-  Map<String, String> getKeyValue(String route);
-
-  void updateConfig(String key, String entry, boolean password);
-
-  void deleteConfig(String key);
+public interface EnvResolver<T> {
 
+  T resolve();
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
new file mode 100644
index 000000000..769d5fea9
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment.variable;
+
+import org.apache.streampipes.commons.constants.CustomEnvs;
+import org.apache.streampipes.commons.constants.Envs;
+
+public abstract class EnvironmentVariable<T> {
+
+  private T defaultValue;
+  private String envVariableName;
+
+  public EnvironmentVariable(String envVariableName,
+                             T defaultValue) {
+    this.envVariableName = envVariableName;
+    this.defaultValue = defaultValue;
+  }
+
+  public EnvironmentVariable(Envs envVariable) {
+    this.envVariableName = envVariable.getEnvVariableName();
+    this.defaultValue = parse(envVariable.getDefaultValue());
+  }
+
+  public T getValue() {
+    return parse(CustomEnvs.getEnv(envVariableName));
+  }
+
+  public boolean exists() {
+    return CustomEnvs.exists(envVariableName);
+  }
+
+  public T getValueOrDefault() {
+    return exists() ? getValue() : defaultValue;
+  }
+
+  public T getValueOrReturn(T defaultValue) {
+    return exists() ? getValue() : defaultValue;
+  }
+
+  public T getValueOrResolve(EnvResolver<T> resolver) {
+    return resolver.resolve();
+  }
+
+  public abstract T parse(String value);
+
+}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
similarity index 69%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
index aff0f18c3..6bfb35c2d 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
-
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+public class IntEnvironmentVariable extends EnvironmentVariable<Integer> {
+  public IntEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public Integer parse(String value) {
+    return Integer.parseInt(value);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
similarity index 70%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
index aff0f18c3..eaf891b4b 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
 
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
 
-  protected ConsulHealthServiceManager consul;
+public class StringEnvironmentVariable extends EnvironmentVariable<String> {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public StringEnvironmentVariable(Envs envVariable) {
+    super(envVariable);
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  @Override
+  public String parse(String value) {
+    return value;
   }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
index 89bc9a700..5c00462a4 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
@@ -17,8 +17,8 @@
  */
 package org.apache.streampipes.extensions.management.config;
 
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.svcdiscovery.consul.ConsulSpConfig;
 
 import java.io.Serializable;
 
@@ -27,7 +27,7 @@ public class ConfigExtractor implements Serializable {
   private SpConfig config;
 
   private ConfigExtractor(String serviceGroup) {
-    this.config = new ConsulSpConfig(serviceGroup);
+    this.config = SpServiceDiscovery.getSpConfig(serviceGroup);
   }
 
   public static ConfigExtractor from(String serviceGroup) {
diff --git a/streampipes-integration-tests/pom.xml b/streampipes-integration-tests/pom.xml
index d576bb508..c4a2588ec 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -56,12 +56,23 @@
             <classifier>embed</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-service-discovery</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <version>1.17.4</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>consul</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
new file mode 100644
index 000000000..367da32bd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import org.mockito.Mockito;
+import org.testcontainers.consul.ConsulContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.mockito.Mockito.mock;
+
+public class AbstractConsulTest {
+
+  static final ConsulContainer consulContainer;
+
+  static {
+    consulContainer = new ConsulContainer(
+        DockerImageName.parse("consul").withTag("1.14.3"))
+        .withExposedPorts(8500);
+    consulContainer.start();
+  }
+
+  protected ConfigItem makeConfigItem(String key, String value) {
+    var item = new ConfigItem();
+    item.setKey(key);
+    item.setValue(value);
+    item.setPassword(false);
+
+    return item;
+  }
+
+  protected Environment mockEnvironment() {
+    var envMock = mock(Environment.class);
+    var hostVariableMock = mock(StringEnvironmentVariable.class);
+    var portVariableMock = mock(IntEnvironmentVariable.class);
+    var consulLocationMock = mock(StringEnvironmentVariable.class);
+    var spDebugMock = mock(BooleanEnvironmentVariable.class);
+
+    Mockito.when(hostVariableMock.getValueOrDefault()).thenReturn(consulContainer.getHost());
+    Mockito.when(portVariableMock.getValueOrDefault()).thenReturn(consulContainer.getMappedPort(8500));
+    Mockito.when(consulLocationMock.exists()).thenReturn(false);
+    Mockito.when(spDebugMock.exists()).thenReturn(false);
+    Mockito.when(spDebugMock.getValueOrReturn(false)).thenReturn(false);
+
+    Mockito.when(envMock.getConsulHost()).thenReturn(hostVariableMock);
+    Mockito.when(envMock.getConsulPort()).thenReturn(portVariableMock);
+    Mockito.when(envMock.getConsulLocation()).thenReturn(consulLocationMock);
+    Mockito.when(envMock.getSpDebug()).thenReturn(spDebugMock);
+
+    return envMock;
+
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
new file mode 100644
index 000000000..5b14c0a63
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import com.google.gson.Gson;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulKvManagementTest extends AbstractConsulTest {
+
+  private ISpKvManagement kvManagement;
+
+  @Before
+  public void before() {
+    var environment = mockEnvironment();
+    this.kvManagement = SpServiceDiscovery
+        .getKeyValueStore(environment);
+  }
+
+  @Test
+  public void testKv() {
+    var test1Key = "a/test";
+    var test2Key = "a/test2";
+    var configItem1 = makeConfigItem(test1Key, "abc");
+    var configItem2 = makeConfigItem(test2Key, "def");
+    kvManagement.updateConfig(test1Key, serialize(configItem1), false);
+    kvManagement.updateConfig(test2Key, serialize(configItem2), false);
+
+    Map<String, String> result = kvManagement.getKeyValue("a");
+
+    assertEquals(2, result.size());
+    assertEquals(serialize(configItem1), result.get(test1Key));
+    assertEquals(serialize(configItem2), result.get(test2Key));
+  }
+
+  private String serialize(ConfigItem item) {
+    return new Gson().toJson(item);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
new file mode 100644
index 000000000..7b64d3d39
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulSpConfigTest extends AbstractConsulTest {
+
+  private SpConfig spConfig;
+  @Before
+  public void before() {
+    var env = mockEnvironment();
+    this.spConfig = SpServiceDiscovery
+        .getSpConfig("test-service-group", env);
+  }
+
+  @Test
+  public void testKvString() {
+
+    spConfig.register("str", "abc", "desc");
+    var value = spConfig.getString("str");
+    assertEquals("abc", value);
+  }
+
+  @Test
+  public void testKvInteger() {
+    spConfig.register("int", 2, "desc");
+    var value = spConfig.getInteger("int");
+    assertEquals(2, value);
+  }
+
+  @Test
+  public void testKvConfigItem() {
+    var configItem = makeConfigItem("config", "value");
+    spConfig.register(configItem);
+    var value = spConfig.getConfigItem("config");
+    assertEquals(configItem, value);
+  }
+
+  @Test
+  public void testUpdateValue() {
+    spConfig.register("stru", "abc", "desc");
+    var value = spConfig.getString("stru");
+    assertEquals("abc", value);
+    spConfig.setString("stru", "abc2");
+    var result = spConfig.getString("stru");
+    assertEquals("abc2", result);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
new file mode 100644
index 000000000..0a4093c30
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConsulSpServiceDiscoveryTest extends AbstractConsulTest {
+
+  private ISpServiceDiscovery serviceDiscovery;
+
+  @Before
+  public void before() {
+    var env = mockEnvironment();
+    this.serviceDiscovery = SpServiceDiscovery
+        .getServiceDiscovery(env);
+  }
+
+  @Test
+  public void testServiceRegistration() {
+
+    serviceDiscovery.registerService(makeRequest(makeServiceTag(), "abc"));
+
+    var endpoints = getEndpoints(false);
+    assertEquals(1, endpoints.size());
+
+    serviceDiscovery.deregisterService("abc");
+
+    endpoints = getEndpoints(false);
+    assertEquals(0, endpoints.size());
+  }
+
+  @Test
+  public void testGetExtensionsServiceGroups() {
+    serviceDiscovery.registerService(makeRequest(List.of(DefaultSpServiceTags.PE, groupTag()), "abc"));
+    serviceDiscovery.registerService(makeRequest(
+        List.of(DefaultSpServiceTags.CORE, groupTag()), "def"));
+
+    var serviceGroups = serviceDiscovery.getExtensionsServiceGroups();
+
+    assertEquals(1, serviceGroups.size());
+    assertTrue(serviceGroups.containsKey("my-group"));
+
+    serviceDiscovery.deregisterService("abc");
+    serviceDiscovery.deregisterService("def");
+  }
+
+  private List<String> getEndpoints(boolean healthy) {
+    return serviceDiscovery.getServiceEndpoints(
+        DefaultSpServiceGroups.EXT,
+        healthy,
+        List.of(makeServiceTag().get(0).asString()));
+  }
+
+  private SpServiceRegistrationRequest makeRequest(List<SpServiceTag> serviceTags,
+                                                   String serviceId) {
+    var req = new SpServiceRegistrationRequest(
+        DefaultSpServiceGroups.EXT,
+        serviceId,
+        "localhost",
+        80,
+        serviceTags,
+        "/"
+    );
+
+    return req;
+  }
+
+  private SpServiceTag groupTag() {
+    return SpServiceTag.create(SpServiceTagPrefix.SP_GROUP, "my-group");
+  }
+
+  private List<SpServiceTag> makeServiceTag() {
+    return List.of(DefaultSpServiceTags.PE);
+  }
+}
diff --git a/streampipes-integration-tests/src/test/resources/logback-test.xml b/streampipes-integration-tests/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..d47f8f1cd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<configuration debug="false" scan="true">
+
+    <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT" />
+    </root>
+
+</configuration>
diff --git a/streampipes-sdk/pom.xml b/streampipes-sdk/pom.xml
index 0f7b7aaf3..889aa9e5d 100644
--- a/streampipes-sdk/pom.xml
+++ b/streampipes-sdk/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
 
         <!-- External dependencies -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.github.drapostolos</groupId>
             <artifactId>type-parser</artifactId>
@@ -71,4 +75,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
index 6c7583df1..4b859f3c6 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
@@ -21,8 +21,6 @@ import java.util.Map;
 
 public interface ISpKvManagement {
 
-  <T> T getValueForRoute(String route, Class<T> type);
-
   Map<String, String> getKeyValue(String route);
 
   void updateConfig(String key, String entry, boolean password);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
index c3dcc0415..803f4e43b 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.svcdiscovery.api.model;
 
+import java.util.Objects;
+
 public class ConfigItem {
 
   private String key;
@@ -117,4 +119,26 @@ public class ConfigItem {
   public void setConfigurationScope(ConfigurationScope configurationScope) {
     this.configurationScope = configurationScope;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConfigItem that = (ConfigItem) o;
+    return isPassword == that.isPassword
+        && Objects.equals(key, that.key)
+        && Objects.equals(description, that.description)
+        && Objects.equals(value, that.value)
+        && Objects.equals(valueType, that.valueType)
+        && configurationScope == that.configurationScope;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, description, value, valueType, configurationScope, isPassword);
+  }
 }
diff --git a/streampipes-service-discovery-consul/pom.xml b/streampipes-service-discovery-consul/pom.xml
index b1cedc09e..a10cad9ef 100644
--- a/streampipes-service-discovery-consul/pom.xml
+++ b/streampipes-service-discovery-consul/pom.xml
@@ -46,8 +46,13 @@
 
         <!-- third-party dependencies -->
         <dependency>
-            <groupId>com.orbitz.consul</groupId>
-            <artifactId>consul-client</artifactId>
+            <groupId>com.ecwid.consul</groupId>
+            <artifactId>consul-api</artifactId>
+            <version>1.4.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
index aff0f18c3..d89b387ac 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
@@ -17,17 +17,19 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+import org.apache.streampipes.commons.environment.Environment;
+
+import com.ecwid.consul.v1.ConsulClient;
 
 public abstract class AbstractConsulService {
 
-  protected ConsulHealthServiceManager consul;
+  private final Environment environment;
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
+  public AbstractConsulService(Environment environment) {
+    this.environment = environment;
   }
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
+  public ConsulClient consulInstance() {
+    return ConsulProvider.INSTANCE.getConsulInstance(environment);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
index d8d2d642b..cb1710483 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
@@ -17,90 +17,64 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.commons.environment.Environment;
 
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.HealthClient;
-import com.orbitz.consul.cache.ServiceHealthCache;
-import com.orbitz.consul.model.health.ServiceHealth;
-import com.orbitz.consul.option.QueryOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.Check;
+import com.ecwid.consul.v1.health.model.HealthService;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public enum ConsulHealthServiceManager {
+public class ConsulHealthServiceManager extends AbstractConsulService {
 
-  INSTANCE;
-
-  private static final Logger LOG = LoggerFactory.getLogger(ConsulHealthServiceManager.class);
   private static final int MAX_RETRIES = 3;
-  private final Consul client;
-  private final Map<String, ServiceHealthCache> serviceHealthCaches;
 
-  ConsulHealthServiceManager() {
-    serviceHealthCaches = new HashMap<>();
-    client = new ConsulProvider().consulInstance();
-    initializeAll();
+  public ConsulHealthServiceManager(Environment environment) {
+    super(environment);
+    initializeCache();
   }
 
-  public void initializeAll() {
-    initialize(DefaultSpServiceGroups.CORE);
-    initialize(DefaultSpServiceGroups.EXT);
-  }
+  public void initializeCache() {
 
-  public void initialize(String serviceGroup) {
-    HealthClient healthClient = client.healthClient();
-    ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, serviceGroup, false, 9, QueryOptions.BLANK);
-    svHealth.start();
-    try {
-      svHealth.awaitInitialized(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    serviceHealthCaches.put(serviceGroup, svHealth);
   }
 
   public List<String> getServiceEndpoints(String serviceGroup,
                                           boolean restrictToHealthy,
                                           List<String> filterByTags) {
-    List<ServiceHealth> activeServices = findService(serviceGroup, 0);
+    List<HealthService> activeServices = findService(serviceGroup, 0);
 
     return activeServices
         .stream()
         .filter(service -> allFiltersSupported(service, filterByTags))
         .filter(service -> !restrictToHealthy
-            || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
+            || service.getChecks().stream().allMatch(check -> check.getStatus() == Check.CheckStatus.PASSING))
         .map(this::makeServiceUrl)
         .collect(Collectors.toList());
   }
 
-  private String makeServiceUrl(ServiceHealth service) {
+  private String makeServiceUrl(HealthService service) {
     return service.getService().getAddress() + ":" + service.getService().getPort();
   }
 
-  private boolean allFiltersSupported(ServiceHealth service,
+  private boolean allFiltersSupported(HealthService service,
                                       List<String> filterByTags) {
-    return service.getService().getTags().containsAll(filterByTags);
+    return new HashSet<>(service.getService().getTags()).containsAll(filterByTags);
   }
 
-  private List<ServiceHealth> findService(String serviceGroup, int retryCount) {
-
-    if (serviceHealthCaches.containsKey(serviceGroup)
-        && serviceHealthCaches.get(serviceGroup).getMap() != null) {
-      ServiceHealthCache cache = serviceHealthCaches.get(serviceGroup);
-      return cache
-          .getMap()
-          .values()
-          .stream()
-          .filter((value) -> value.getService().getService().equals(serviceGroup))
-          .collect(Collectors.toList());
-    } else {
+  private List<HealthService> findService(String serviceGroup, int retryCount) {
+    HealthServicesRequest request = HealthServicesRequest.newBuilder()
+        .setPassing(true)
+        .setQueryParams(QueryParams.DEFAULT)
+        .build();
+    Response<List<HealthService>> healthyServicesResp = consulInstance().getHealthServices(serviceGroup, request);
+    var healthyServices = healthyServicesResp.getValue();
+    if (healthyServices.size() == 0) {
       if (retryCount < MAX_RETRIES) {
         try {
           retryCount++;
@@ -113,10 +87,10 @@ public enum ConsulHealthServiceManager {
       } else {
         return Collections.emptyList();
       }
+    } else {
+      return healthyServices;
     }
   }
 
-  public Consul consulInstance() {
-    return this.client;
-  }
+
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index 69654eaba..cd7282d32 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -18,9 +18,9 @@
 package org.apache.streampipes.svcdiscovery.consul;
 
 import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
 
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.ConsulClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,47 +29,52 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.concurrent.TimeUnit;
 
-public class ConsulProvider {
+public enum ConsulProvider {
+
+  INSTANCE;
 
   private static final Logger LOG = LoggerFactory.getLogger(ConsulProvider.class);
   private static final int CHECK_INTERVAL = 1;
 
-  private final String consulHost;
-  private final String consulUrlString;
-  private final int consulPort;
+  private ConsulClient consulClient;
+  private boolean initialized = false;
 
-  public ConsulProvider() {
-    this.consulHost = getConsulHost();
-    this.consulPort = getConsulPort();
-    this.consulUrlString = makeConsulUrl();
+  ConsulProvider() {
   }
 
-  public Consul consulInstance() {
-    boolean connected;
+  public ConsulClient getConsulInstance(Environment environment) {
+    if (!initialized) {
+      createConsulInstance(environment);
+    }
 
-    do {
-      LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
-      connected = checkConsulAvailable();
+    return consulClient;
+  }
 
-      if (!connected) {
-        LOG.info("Retrying in {} second", CHECK_INTERVAL);
-        try {
-          TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    } while (!connected);
+  private void createConsulInstance(Environment environment) {
+    var consulHost = getConsulHost(environment);
+    var consulPort = getConsulPort(environment);
+    var connected = false;
 
-    LOG.info("Successfully connected to Consul");
-    return Consul.builder().withUrl(consulUrlString).build();
-  }
+    LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
+    connected = checkConsulAvailable(consulHost, consulPort);
 
-  public String makeConsulUrl() {
-    return "http://" + consulHost + ":" + consulPort;
+    if (connected) {
+      LOG.info("Successfully connected to Consul on host {}", consulHost);
+      this.consulClient = new ConsulClient(consulHost, consulPort);
+      this.initialized = true;
+    } else {
+      LOG.info("Retrying in {} second", CHECK_INTERVAL);
+      try {
+        TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
+        createConsulInstance(environment);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
   }
 
-  private boolean checkConsulAvailable() {
+  private boolean checkConsulAvailable(String consulHost,
+                                       int consulPort) {
     try {
       InetSocketAddress sa = new InetSocketAddress(consulHost, consulPort);
       Socket ss = new Socket();
@@ -83,18 +88,18 @@ public class ConsulProvider {
     }
   }
 
-  private int getConsulPort() {
-    return Envs.SP_CONSUL_PORT.getValueAsIntOrDefault(DefaultEnvValues.CONSUL_PORT_DEFAULT);
+  private int getConsulPort(Environment environment) {
+    return environment.getConsulPort().getValueOrDefault();
   }
 
-  private String getConsulHost() {
-    if (Envs.SP_CONSUL_LOCATION.exists()) {
-      return Envs.SP_CONSUL_LOCATION.getValue();
+  private String getConsulHost(Environment environment) {
+    if (environment.getConsulLocation().exists()) {
+      return environment.getConsulLocation().getValue();
     } else {
-      if (Envs.SP_DEBUG.getValueAsBooleanOrDefault(false)) {
+      if (environment.getSpDebug().getValueOrReturn(false)) {
         return DefaultEnvValues.CONSUL_HOST_LOCAL;
       } else {
-        return Envs.SP_CONSUL_HOST.getValueOrDefault(DefaultEnvValues.CONSUL_HOST_DEFAULT);
+        return environment.getConsulHost().getValueOrDefault();
       }
     }
   }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index 9f3f6ddec..73770eeea 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -18,35 +18,30 @@
 
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItemUtils;
 import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
 
+import com.ecwid.consul.v1.ConsulClient;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Optional;
-
 public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
   private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
   private static final String SLASH = "/";
   private final String serviceName;
-  private final KeyValueClient kvClient;
-
-  // TODO Implement mechanism to update the client when some configuration parameters change in Consul
-  private Map<String, Object> configProps;
+  private final ConsulClient kvClient;
 
-  public ConsulSpConfig(String serviceName) {
-    Consul consul = consulInstance();
-    this.kvClient = consul.keyValueClient();
+  public ConsulSpConfig(String serviceName,
+                        Environment environment) {
+    super(environment);
+    this.kvClient = consulInstance();
     this.serviceName = serviceName;
   }
 
@@ -82,9 +77,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public void registerObject(String key, Object defaultValue, String description) {
-    Optional<String> i = kvClient.getValueAsString(addSn(key));
-    if (!i.isPresent()) {
-      kvClient.putValue(addSn(key), toJson(defaultValue));
+    var i = kvClient.getKVValue(addSn(key));
+    if (i.getValue() == null) {
+      kvClient.setKVValue(addSn(key), toJson(defaultValue));
     }
   }
 
@@ -96,28 +91,28 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
   @Override
   public void register(ConfigItem configItem) {
     String key = addSn(configItem.getKey());
-    Optional<String> i = kvClient.getValueAsString(key);
+    var i = kvClient.getKVValue(key);
 
-    if (!i.isPresent()) {
+    if (i.getValue() == null) {
       // Set the value of environment variable as default
       String envVariable = System.getenv(configItem.getKey());
       if (envVariable != null) {
         configItem.setValue(envVariable);
-        kvClient.putValue(key, toJson(configItem));
+        kvClient.setKVValue(key, toJson(configItem));
       } else {
-        kvClient.putValue(key, toJson(configItem));
+        kvClient.setKVValue(key, toJson(configItem));
       }
     }
   }
 
-  private void register(String key, String defaultValue, String valueType, String description,
-                        ConfigurationScope configurationScope, boolean isPassword) {
+  private void register(String key,
+                        String defaultValue,
+                        String valueType,
+                        String description,
+                        ConfigurationScope configurationScope,
+                        boolean isPassword) {
     ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
     register(configItem);
-
-    if (configProps != null) {
-      configProps.put(key, getString(key));
-    }
   }
 
   @Override
@@ -142,10 +137,10 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
-    Optional<String> os = kvClient.getValueAsString(addSn(key));
-    if (os.isPresent()) {
+    var os = kvClient.getKVValue(addSn(key));
+    if (os.getValue() != null) {
       try {
-        return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
+        return JacksonSerializer.getObjectMapper().readValue(os.getValue().getDecodedValue(), clazz);
       } catch (JsonProcessingException e) {
         LOG.info("Could not deserialize object", e);
         return defaultValue;
@@ -157,9 +152,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public ConfigItem getConfigItem(String key) {
-    Optional<String> os = kvClient.getValueAsString(addSn(key));
+    var os = kvClient.getKVValue(addSn(key)).getValue();
 
-    return fromJson(os.get());
+    return fromJson(os.getDecodedValue());
   }
 
   @Override
@@ -179,12 +174,12 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
 
   @Override
   public void setString(String key, String value) {
-    kvClient.putValue(addSn(key), value);
+    kvClient.setKVValue(addSn(key), value);
   }
 
   @Override
   public void setObject(String key, Object value) {
-    kvClient.putValue(addSn(key), toJson(value));
+    kvClient.setKVValue(addSn(key), toJson(value));
   }
 
   private String addSn(String key) {
@@ -202,7 +197,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
     }
   }
 
-  private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope,
+  private ConfigItem prepareConfigItem(String valueType,
+                                       String description,
+                                       ConfigurationScope configurationScope,
                                        boolean password) {
     ConfigItem configItem = new ConfigItem();
     configItem.setValueType(valueType);
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
similarity index 61%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
index aff0f18c3..1020553cd 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
@@ -15,19 +15,30 @@
  * limitations under the License.
  *
  */
+
 package org.apache.streampipes.svcdiscovery.consul;
 
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 
-public abstract class AbstractConsulService {
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-  protected ConsulHealthServiceManager consul;
+public enum ServiceCache {
 
-  public AbstractConsulService() {
-    this.consul = ConsulHealthServiceManager.INSTANCE;
-  }
+  INSTANCE;
+
+  private LoadingCache<String, List<HealthService>> cache;
+
+  ServiceCache() {
 
-  public Consul consulInstance() {
-    return this.consul.consulInstance();
   }
+
+
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
index 0138114a5..31247feca 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
@@ -17,15 +17,11 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
-import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
-import com.orbitz.consul.model.ConsulResponse;
-import com.orbitz.consul.model.kv.Value;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,47 +29,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagement {
+public class SpConsulKvManagement extends AbstractConsulService implements ISpKvManagement {
 
   private static final Logger LOG = LoggerFactory.getLogger(SpConsulKvManagement.class);
 
   private static final String CONSUL_NAMESPACE = "/sp/v1/";
 
-  public <T> T getValueForRoute(String route, Class<T> type) {
-    try {
-      String entry = getKeyValue(route)
-          .values()
-          .stream()
-          .findFirst()
-          .orElse(null);
-
-      if (type.equals(Integer.class)) {
-        return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      } else if (type.equals(Boolean.class)) {
-        return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      } else {
-        return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
-      }
-    } catch (JsonProcessingException e) {
-      e.printStackTrace();
-    }
-    throw new IllegalArgumentException("Cannot get entry from Consul");
+  public SpConsulKvManagement(Environment environment) {
+    super(environment);
   }
 
   public Map<String, String> getKeyValue(String route) {
-    Consul consul = consulInstance();
-    KeyValueClient keyValueClient = consul.keyValueClient();
+    var consul = consulInstance();
 
     Map<String, String> keyValues = new HashMap<>();
+    Response<List<GetValue>> consulResponseWithValues = consul.getKVValues(route);
 
-    ConsulResponse<List<Value>> consulResponseWithValues = keyValueClient.getConsulResponseWithValues(route);
-
-    if (consulResponseWithValues.getResponse() != null) {
-      for (Value value : consulResponseWithValues.getResponse()) {
+    if (consulResponseWithValues.getValue() != null) {
+      for (GetValue value : consulResponseWithValues.getValue()) {
         String key = value.getKey();
         String v = "";
-        if (value.getValueAsString().isPresent()) {
-          v = value.getValueAsString().get();
+        if (value.getValue() != null) {
+          v = value.getDecodedValue();
         }
         keyValues.put(key, v);
       }
@@ -82,16 +59,16 @@ public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagem
   }
 
   public void updateConfig(String key, String entry, boolean password) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     if (!password) {
       LOG.info("Updated config - key:" + key + " value: " + entry);
-      consul.keyValueClient().putValue(key, entry);
+      consul.setKVValue(key, entry);
     }
   }
 
   public void deleteConfig(String key) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     LOG.info("Delete config: {}", key);
-    consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
+    consul.deleteKVValue(CONSUL_NAMESPACE + key);
   }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 8156e2545..a00acf575 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.svcdiscovery.consul;
 
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
@@ -24,13 +25,10 @@ import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationReques
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
-import com.orbitz.consul.AgentClient;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.model.agent.ImmutableRegCheck;
-import com.orbitz.consul.model.agent.ImmutableRegistration;
-import com.orbitz.consul.model.agent.Registration;
-import com.orbitz.consul.model.health.HealthCheck;
-import com.orbitz.consul.model.health.Service;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.Check;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.agent.model.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +47,16 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   private static final String COLON = ":";
   private static final String HEALTH_CHECK_INTERVAL = "10s";
 
+  private ConsulHealthServiceManager healthServiceManager;
+
+  public SpConsulServiceDiscovery(Environment environment) {
+    super(environment);
+    this.healthServiceManager = new ConsulHealthServiceManager(environment);
+  }
+
   @Override
   public void registerService(SpServiceRegistrationRequest req) {
-    consulInstance().agentClient().register((createRegistrationBody(req)));
+    consulInstance().agentServiceRegister(createRegistrationBody(req));
     LOG.info("Successfully registered service at Consul: " + req.getSvcId());
   }
 
@@ -75,30 +80,32 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
 
   @Override
   public List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy, List<String> filterByTags) {
-    return ConsulHealthServiceManager.INSTANCE.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
+    return healthServiceManager.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
   }
 
   @Override
   public Map<String, String> getExtensionsServiceGroups() {
     LOG.info("Load pipeline element service status");
-    Consul consul = consulInstance();
-    AgentClient agent = consul.agentClient();
+    var consul = consulInstance();
 
-    Map<String, Service> services = consul.agentClient().getServices();
-    Map<String, HealthCheck> checks = agent.getChecks();
+    Response<Map<String, Service>> servicesResp = consul.getAgentServices();
+    Response<Map<String, Check>> checksResp = consul.getAgentChecks();
 
     Map<String, String> peSvcs = new HashMap<>();
-
-    for (Map.Entry<String, Service> entry : services.entrySet()) {
-      if (hasExtensionsTag(entry.getValue().getTags())) {
-        String serviceId = entry.getValue().getId();
-        String serviceStatus = "critical";
-        if (checks.containsKey("service:" + entry.getKey())) {
-          serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
+    if (servicesResp.getValue() != null) {
+      var services = servicesResp.getValue();
+      var checks = checksResp.getValue();
+      for (Map.Entry<String, Service> entry : services.entrySet()) {
+        if (hasExtensionsTag(entry.getValue().getTags())) {
+          String serviceId = entry.getValue().getId();
+          String serviceStatus = "critical";
+          if (checks.containsKey("service:" + entry.getKey())) {
+            serviceStatus = checks.get("service:" + entry.getKey()).getStatus().name();
+          }
+          LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
+          String serviceGroup = extractServiceGroup(entry.getValue().getTags());
+          peSvcs.put(serviceGroup, serviceStatus);
         }
-        LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
-        String serviceGroup = extractServiceGroup(entry.getValue().getTags());
-        peSvcs.put(serviceGroup, serviceStatus);
       }
     }
     return peSvcs;
@@ -117,25 +124,33 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
 
   @Override
   public void deregisterService(String svcId) {
-    Consul consul = consulInstance();
+    var consul = consulInstance();
     LOG.info("Deregister service: " + svcId);
-    consul.agentClient().deregister(svcId);
+    consul.agentServiceDeregister(svcId);
+  }
+
+  private NewService createRegistrationBody(SpServiceRegistrationRequest req) {
+    var service = new NewService();
+    service.setId(req.getSvcId());
+    service.setName(req.getSvcGroup());
+    service.setPort(req.getPort());
+    service.setAddress(HTTP_PROTOCOL + req.getHost());
+    service.setCheck(createServiceCheck(req));
+
+    service.setTags(asString(req.getTags()));
+    service.setEnableTagOverride(true);
+
+    return service;
   }
 
-  private Registration createRegistrationBody(SpServiceRegistrationRequest req) {
-    return ImmutableRegistration.builder()
-        .id(req.getSvcId())
-        .name(req.getSvcGroup())
-        .port(req.getPort())
-        .address(HTTP_PROTOCOL + req.getHost())
-        .check(ImmutableRegCheck.builder()
-            .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
-            .interval(HEALTH_CHECK_INTERVAL)
-            .deregisterCriticalServiceAfter("120s")
-            .status("passing")
-            .build())
-        .tags(asString(req.getTags()))
-        .enableTagOverride(true)
-        .build();
+  private NewService.Check createServiceCheck(SpServiceRegistrationRequest req) {
+    var serviceCheck = new NewService.Check();
+
+    serviceCheck.setHttp(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath());
+    serviceCheck.setInterval(HEALTH_CHECK_INTERVAL);
+    serviceCheck.setDeregisterCriticalServiceAfter("120s");
+    serviceCheck.setStatus("passing");
+
+    return serviceCheck;
   }
 }
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
index 8f836b077..cead1334b 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.svcdiscovery;
 
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
@@ -27,15 +29,28 @@ import org.apache.streampipes.svcdiscovery.consul.SpConsulServiceDiscovery;
 public class SpServiceDiscovery {
 
   public static ISpServiceDiscovery getServiceDiscovery() {
-    return new SpConsulServiceDiscovery();
+    return new SpConsulServiceDiscovery(Environments.getEnvironment());
+  }
+
+  public static ISpServiceDiscovery getServiceDiscovery(Environment environment) {
+    return new SpConsulServiceDiscovery(environment);
   }
 
   public static ISpKvManagement getKeyValueStore() {
-    return new SpConsulKvManagement();
+    return new SpConsulKvManagement(Environments.getEnvironment());
+  }
+
+  public static ISpKvManagement getKeyValueStore(Environment environment) {
+    return new SpConsulKvManagement(environment);
   }
 
   public static SpConfig getSpConfig(String serviceGroup) {
-    return new ConsulSpConfig(serviceGroup);
+    return getSpConfig(serviceGroup, Environments.getEnvironment());
+  }
+
+  public static SpConfig getSpConfig(String serviceGroup,
+                                     Environment environment) {
+    return new ConsulSpConfig(serviceGroup, environment);
   }
 
 }
diff --git a/streampipes-storage-couchdb/pom.xml b/streampipes-storage-couchdb/pom.xml
index 4a0c28cdb..f0346cd60 100644
--- a/streampipes-storage-couchdb/pom.xml
+++ b/streampipes-storage-couchdb/pom.xml
@@ -56,6 +56,10 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.lightcouch</groupId>
             <artifactId>lightcouch</artifactId>


[streampipes] 04/05: Bump consul version to 1.14.3 (#1157)

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ca9d9b33b18fa4afb4235cd7432c750b450e5e8e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Jan 27 09:20:18 2023 +0100

    Bump consul version to 1.14.3 (#1157)
---
 installer/compose/docker-compose.full.yml                      | 2 +-
 installer/compose/docker-compose.nats.yml                      | 2 +-
 installer/compose/docker-compose.yml                           | 2 +-
 installer/k8s/templates/external/consul/consul-deployment.yaml | 2 +-
 installer/k8s/values.yaml                                      | 2 +-
 5 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml
index 83bf879a4..687719e86 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -82,7 +82,7 @@ services:
       spnet:
 
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
       - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
       - "CONSUL_BIND_INTERFACE=eth0"
diff --git a/installer/compose/docker-compose.nats.yml b/installer/compose/docker-compose.nats.yml
index ba115d92d..e977e333e 100644
--- a/installer/compose/docker-compose.nats.yml
+++ b/installer/compose/docker-compose.nats.yml
@@ -55,7 +55,7 @@ services:
       spnet:
 
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
       - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
       - "CONSUL_BIND_INTERFACE=eth0"
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml
index a5d7e4727..0657b8601 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -54,7 +54,7 @@ services:
       spnet:
 
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
       - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
       - "CONSUL_BIND_INTERFACE=eth0"
diff --git a/installer/k8s/templates/external/consul/consul-deployment.yaml b/installer/k8s/templates/external/consul/consul-deployment.yaml
index b80fc11aa..18fd60801 100644
--- a/installer/k8s/templates/external/consul/consul-deployment.yaml
+++ b/installer/k8s/templates/external/consul/consul-deployment.yaml
@@ -33,7 +33,7 @@ spec:
             claimName: consul-pvc
       containers:
         - name: consul
-          image: fogsyio/consul:{{ .Values.external.consulVersion }}
+          image: consul:{{ .Values.external.consulVersion }}
           imagePullPolicy: {{ .Values.pullPolicy }}
           command:
             - "/bin/sh"
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index cea3b6fbf..d322e36cf 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -25,7 +25,7 @@ streampipes:
   registry: "apachestreampipes"
 
 external:
-  consulVersion: 1.9.6
+  consulVersion: 1.14.3
   couchdbVersion: 2.3.1
   flinkVersion: 1.13.5-scala_2.11
   kafkaVersion: 2.2.0


[streampipes] 05/05: [hotfix] Update Consul version in root docker-compose.yml

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 00a043c3fbc79d64232b80963a6ed0a33da98bd1
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Fri Jan 27 10:41:43 2023 +0100

    [hotfix] Update Consul version in root docker-compose.yml
---
 docker-compose.yml | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/docker-compose.yml b/docker-compose.yml
index 96be1ae7b..e324ecf82 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -76,11 +76,13 @@ services:
       spnet:
 
   consul:
-    image: fogsyio/consul:1.9.6
+    image: consul:1.14.3
     environment:
-      - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
+      - "CONSUL_LOCAL_CONFIG={\"rpc_streaming\": false, \"disable_update_check\": true, \"rpc\": {\"enable_streaming\": false}, \"use_streaming_backend\": false}"
       - "CONSUL_BIND_INTERFACE=eth0"
       - "CONSUL_HTTP_ADDR=0.0.0.0"
+      - "CONSUL_RPC_ENABLE_STREAMING=false"
+      - "CONSUL_USE_STREAMING_BACKEND=false"
     entrypoint:
       - consul
       - agent