You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/01/27 21:08:29 UTC
[streampipes] 01/05: Replace consul-client with consul-api library (#1158)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit e02d3bf5ea454e02f355e97599013cc637f7dc10
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 26 22:03:31 2023 +0100
Replace consul-client with consul-api library (#1158)
---
.../deploy/standalone/consul/docker-compose.yml | 6 +-
...ive-data-from-the-streampipes-data-stream.ipynb | 2 +-
.../apache/streampipes/commons/constants/Envs.java | 49 +++++-----
.../commons/environment/DefaultEnvironment.java | 47 +++++++++
.../commons/environment/Environment.java | 17 ++--
.../commons/environment/Environments.java | 16 +---
.../variable/BooleanEnvironmentVariable.java | 16 ++--
.../commons/environment/variable/EnvResolver.java | 14 +--
.../environment/variable/EnvironmentVariable.java | 62 ++++++++++++
.../variable/IntEnvironmentVariable.java | 17 ++--
.../variable/StringEnvironmentVariable.java | 16 ++--
.../management/config/ConfigExtractor.java | 4 +-
streampipes-integration-tests/pom.xml | 11 +++
.../svcdiscovery/AbstractConsulTest.java | 74 +++++++++++++++
.../svcdiscovery/ConsulKvManagementTest.java | 63 +++++++++++++
.../svcdiscovery/ConsulSpConfigTest.java | 71 ++++++++++++++
.../svcdiscovery/ConsulSpServiceDiscoveryTest.java | 105 +++++++++++++++++++++
.../src/test/resources/logback-test.xml | 34 +++++++
streampipes-sdk/pom.xml | 6 +-
.../svcdiscovery/api/ISpKvManagement.java | 2 -
.../svcdiscovery/api/model/ConfigItem.java | 24 +++++
streampipes-service-discovery-consul/pom.xml | 9 +-
.../svcdiscovery/consul/AbstractConsulService.java | 14 +--
.../consul/ConsulHealthServiceManager.java | 82 ++++++----------
.../svcdiscovery/consul/ConsulProvider.java | 79 ++++++++--------
.../svcdiscovery/consul/ConsulSpConfig.java | 63 ++++++-------
...bstractConsulService.java => ServiceCache.java} | 27 ++++--
.../svcdiscovery/consul/SpConsulKvManagement.java | 55 ++++-------
.../consul/SpConsulServiceDiscovery.java | 95 +++++++++++--------
.../svcdiscovery/SpServiceDiscovery.java | 21 ++++-
streampipes-storage-couchdb/pom.xml | 4 +
31 files changed, 799 insertions(+), 306 deletions(-)
diff --git a/installer/cli/deploy/standalone/consul/docker-compose.yml b/installer/cli/deploy/standalone/consul/docker-compose.yml
index 49dca0e95..dd26e0af9 100644
--- a/installer/cli/deploy/standalone/consul/docker-compose.yml
+++ b/installer/cli/deploy/standalone/consul/docker-compose.yml
@@ -16,11 +16,13 @@
version: "3.4"
services:
consul:
- image: fogsyio/consul:1.9.6
+ image: consul:1.14.3
environment:
- - "CONSUL_LOCAL_CONFIG={\"disable_update_check\": true}"
+ - "CONSUL_LOCAL_CONFIG={\"rpc_streaming\": false, \"disable_update_check\": true, \"rpc\": {\"enable_streaming\": false}, \"use_streaming_backend\": false}"
- "CONSUL_BIND_INTERFACE=eth0"
- "CONSUL_HTTP_ADDR=0.0.0.0"
+ - "CONSUL_RPC_ENABLE_STREAMING=false"
+ - "CONSUL_USE_STREAMING_BACKEND=false"
entrypoint:
- consul
- agent
diff --git a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 3b69c065d..3f3a3c9c3 100644
--- a/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/examples/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -610,4 +610,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
-}
+}
\ No newline at end of file
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 9ad589bb6..546050058 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -19,35 +19,37 @@ package org.apache.streampipes.commons.constants;
public enum Envs {
- SP_HOST("SP_HOST"),
- SP_PORT("SP_PORT"),
+ SP_HOST("SP_HOST", null),
+ SP_PORT("SP_PORT", null),
@Deprecated(since = "0.90.0", forRemoval = true)
- SP_CONSUL_LOCATION("CONSUL_LOCATION"),
+ SP_CONSUL_LOCATION("CONSUL_LOCATION", "consul"),
- SP_CONSUL_HOST("SP_CONSUL_HOST"),
- SP_CONSUL_PORT("SP_CONSUL_PORT"),
- SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS"),
- SP_JWT_SECRET("JWT_SECRET"),
- SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"),
- SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"),
- SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC"),
- SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL"),
- SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD"),
- SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER"),
- SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET"),
- SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS"),
- SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE"),
- SP_CLIENT_USER("SP_CLIENT_USER"),
- SP_CLIENT_SECRET("SP_CLIENT_SECRET"),
- SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE"),
- SP_DEBUG("SP_DEBUG"),
- SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN");
+ SP_CONSUL_HOST("SP_CONSUL_HOST", "consul"),
+ SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"),
+ SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null),
+ SP_JWT_SECRET("JWT_SECRET", null),
+ SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null),
+ SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null),
+ SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null),
+ SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null),
+ SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null),
+ SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null),
+ SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null),
+ SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null),
+ SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null),
+ SP_CLIENT_USER("SP_CLIENT_USER", null),
+ SP_CLIENT_SECRET("SP_CLIENT_SECRET", null),
+ SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null),
+ SP_DEBUG("SP_DEBUG", "false"),
+ SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null);
private final String envVariableName;
+ private final String defaultValue;
- Envs(String envVariableName) {
+ Envs(String envVariableName, String defaultValue) {
this.envVariableName = envVariableName;
+ this.defaultValue = defaultValue;
}
public boolean exists() {
@@ -82,4 +84,7 @@ public enum Envs {
return this.exists() ? this.getValue() : defaultValue;
}
+ public String getDefaultValue() {
+ return defaultValue;
+ }
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
new file mode 100644
index 000000000..0c051ef16
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment;
+
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+
+public class DefaultEnvironment implements Environment {
+
+ @Override
+ public StringEnvironmentVariable getConsulHost() {
+ return new StringEnvironmentVariable(Envs.SP_CONSUL_HOST);
+ }
+
+ @Override
+ public IntEnvironmentVariable getConsulPort() {
+ return new IntEnvironmentVariable(Envs.SP_CONSUL_PORT);
+ }
+
+ @Override
+ public BooleanEnvironmentVariable getSpDebug() {
+ return new BooleanEnvironmentVariable(Envs.SP_DEBUG);
+ }
+
+ @Override
+ public StringEnvironmentVariable getConsulLocation() {
+ return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
+ }
+}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
similarity index 58%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 6c7583df1..b3a3256d2 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -15,18 +15,21 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api;
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
-public interface ISpKvManagement {
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
- <T> T getValueForRoute(String route, Class<T> type);
+public interface Environment {
- Map<String, String> getKeyValue(String route);
+ StringEnvironmentVariable getConsulHost();
- void updateConfig(String key, String entry, boolean password);
+ IntEnvironmentVariable getConsulPort();
- void deleteConfig(String key);
+ BooleanEnvironmentVariable getSpDebug();
+ @Deprecated(since = "0.90.0", forRemoval = true)
+ StringEnvironmentVariable getConsulLocation();
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
index 6c7583df1..631e72dcc 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
@@ -15,18 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api;
-import java.util.Map;
+package org.apache.streampipes.commons.environment;
-public interface ISpKvManagement {
-
- <T> T getValueForRoute(String route, Class<T> type);
-
- Map<String, String> getKeyValue(String route);
-
- void updateConfig(String key, String entry, boolean password);
-
- void deleteConfig(String key);
+public class Environments {
+ public static Environment getEnvironment() {
+ return new DefaultEnvironment();
+ }
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
similarity index 68%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
index aff0f18c3..2c7b3e064 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/BooleanEnvironmentVariable.java
@@ -15,19 +15,19 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
- protected ConsulHealthServiceManager consul;
+public class BooleanEnvironmentVariable extends EnvironmentVariable<Boolean> {
- public AbstractConsulService() {
- this.consul = ConsulHealthServiceManager.INSTANCE;
+ public BooleanEnvironmentVariable(Envs envVariable) {
+ super(envVariable);
}
- public Consul consulInstance() {
- return this.consul.consulInstance();
+ @Override
+ public Boolean parse(String value) {
+ return Boolean.parseBoolean(value.toLowerCase());
}
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
similarity index 72%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
index 6c7583df1..591f0d0f4 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvResolver.java
@@ -15,18 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api;
-import java.util.Map;
+package org.apache.streampipes.commons.environment.variable;
-public interface ISpKvManagement {
-
- <T> T getValueForRoute(String route, Class<T> type);
-
- Map<String, String> getKeyValue(String route);
-
- void updateConfig(String key, String entry, boolean password);
-
- void deleteConfig(String key);
+public interface EnvResolver<T> {
+ T resolve();
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
new file mode 100644
index 000000000..769d5fea9
--- /dev/null
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.environment.variable;
+
+import org.apache.streampipes.commons.constants.CustomEnvs;
+import org.apache.streampipes.commons.constants.Envs;
+
+public abstract class EnvironmentVariable<T> {
+
+ private T defaultValue;
+ private String envVariableName;
+
+ public EnvironmentVariable(String envVariableName,
+ T defaultValue) {
+ this.envVariableName = envVariableName;
+ this.defaultValue = defaultValue;
+ }
+
+ public EnvironmentVariable(Envs envVariable) {
+ this.envVariableName = envVariable.getEnvVariableName();
+ this.defaultValue = parse(envVariable.getDefaultValue());
+ }
+
+ public T getValue() {
+ return parse(CustomEnvs.getEnv(envVariableName));
+ }
+
+ public boolean exists() {
+ return CustomEnvs.exists(envVariableName);
+ }
+
+ public T getValueOrDefault() {
+ return exists() ? getValue() : defaultValue;
+ }
+
+ public T getValueOrReturn(T defaultValue) {
+ return exists() ? getValue() : defaultValue;
+ }
+
+ public T getValueOrResolve(EnvResolver<T> resolver) {
+ return resolver.resolve();
+ }
+
+ public abstract T parse(String value);
+
+}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
similarity index 69%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
index aff0f18c3..6bfb35c2d 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/IntEnvironmentVariable.java
@@ -15,19 +15,18 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
- protected ConsulHealthServiceManager consul;
-
- public AbstractConsulService() {
- this.consul = ConsulHealthServiceManager.INSTANCE;
+public class IntEnvironmentVariable extends EnvironmentVariable<Integer> {
+ public IntEnvironmentVariable(Envs envVariable) {
+ super(envVariable);
}
- public Consul consulInstance() {
- return this.consul.consulInstance();
+ @Override
+ public Integer parse(String value) {
+ return Integer.parseInt(value);
}
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
similarity index 70%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
index aff0f18c3..eaf891b4b 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/StringEnvironmentVariable.java
@@ -15,19 +15,19 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
+package org.apache.streampipes.commons.environment.variable;
-public abstract class AbstractConsulService {
+import org.apache.streampipes.commons.constants.Envs;
- protected ConsulHealthServiceManager consul;
+public class StringEnvironmentVariable extends EnvironmentVariable<String> {
- public AbstractConsulService() {
- this.consul = ConsulHealthServiceManager.INSTANCE;
+ public StringEnvironmentVariable(Envs envVariable) {
+ super(envVariable);
}
- public Consul consulInstance() {
- return this.consul.consulInstance();
+ @Override
+ public String parse(String value) {
+ return value;
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
index 89bc9a700..5c00462a4 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/config/ConfigExtractor.java
@@ -17,8 +17,8 @@
*/
package org.apache.streampipes.extensions.management.config;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.svcdiscovery.consul.ConsulSpConfig;
import java.io.Serializable;
@@ -27,7 +27,7 @@ public class ConfigExtractor implements Serializable {
private SpConfig config;
private ConfigExtractor(String serviceGroup) {
- this.config = new ConsulSpConfig(serviceGroup);
+ this.config = SpServiceDiscovery.getSpConfig(serviceGroup);
}
public static ConfigExtractor from(String serviceGroup) {
diff --git a/streampipes-integration-tests/pom.xml b/streampipes-integration-tests/pom.xml
index d576bb508..c4a2588ec 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -56,12 +56,23 @@
<classifier>embed</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-service-discovery</artifactId>
+ <version>0.91.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.4</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>consul</artifactId>
+ <version>1.17.6</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
new file mode 100644
index 000000000..367da32bd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/AbstractConsulTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import org.mockito.Mockito;
+import org.testcontainers.consul.ConsulContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.mockito.Mockito.mock;
+
+public class AbstractConsulTest {
+
+ static final ConsulContainer consulContainer;
+
+ static {
+ consulContainer = new ConsulContainer(
+ DockerImageName.parse("consul").withTag("1.14.3"))
+ .withExposedPorts(8500);
+ consulContainer.start();
+ }
+
+ protected ConfigItem makeConfigItem(String key, String value) {
+ var item = new ConfigItem();
+ item.setKey(key);
+ item.setValue(value);
+ item.setPassword(false);
+
+ return item;
+ }
+
+ protected Environment mockEnvironment() {
+ var envMock = mock(Environment.class);
+ var hostVariableMock = mock(StringEnvironmentVariable.class);
+ var portVariableMock = mock(IntEnvironmentVariable.class);
+ var consulLocationMock = mock(StringEnvironmentVariable.class);
+ var spDebugMock = mock(BooleanEnvironmentVariable.class);
+
+ Mockito.when(hostVariableMock.getValueOrDefault()).thenReturn(consulContainer.getHost());
+ Mockito.when(portVariableMock.getValueOrDefault()).thenReturn(consulContainer.getMappedPort(8500));
+ Mockito.when(consulLocationMock.exists()).thenReturn(false);
+ Mockito.when(spDebugMock.exists()).thenReturn(false);
+ Mockito.when(spDebugMock.getValueOrReturn(false)).thenReturn(false);
+
+ Mockito.when(envMock.getConsulHost()).thenReturn(hostVariableMock);
+ Mockito.when(envMock.getConsulPort()).thenReturn(portVariableMock);
+ Mockito.when(envMock.getConsulLocation()).thenReturn(consulLocationMock);
+ Mockito.when(envMock.getSpDebug()).thenReturn(spDebugMock);
+
+ return envMock;
+
+ }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
new file mode 100644
index 000000000..5b14c0a63
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulKvManagementTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
+import com.google.gson.Gson;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulKvManagementTest extends AbstractConsulTest {
+
+ private ISpKvManagement kvManagement;
+
+ @Before
+ public void before() {
+ var environment = mockEnvironment();
+ this.kvManagement = SpServiceDiscovery
+ .getKeyValueStore(environment);
+ }
+
+ @Test
+ public void testKv() {
+ var test1Key = "a/test";
+ var test2Key = "a/test2";
+ var configItem1 = makeConfigItem(test1Key, "abc");
+ var configItem2 = makeConfigItem(test2Key, "def");
+ kvManagement.updateConfig(test1Key, serialize(configItem1), false);
+ kvManagement.updateConfig(test2Key, serialize(configItem2), false);
+
+ Map<String, String> result = kvManagement.getKeyValue("a");
+
+ assertEquals(2, result.size());
+ assertEquals(serialize(configItem1), result.get(test1Key));
+ assertEquals(serialize(configItem2), result.get(test2Key));
+ }
+
+ private String serialize(ConfigItem item) {
+ return new Gson().toJson(item);
+ }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
new file mode 100644
index 000000000..7b64d3d39
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsulSpConfigTest extends AbstractConsulTest {
+
+ private SpConfig spConfig;
+ @Before
+ public void before() {
+ var env = mockEnvironment();
+ this.spConfig = SpServiceDiscovery
+ .getSpConfig("test-service-group", env);
+ }
+
+ @Test
+ public void testKvString() {
+
+ spConfig.register("str", "abc", "desc");
+ var value = spConfig.getString("str");
+ assertEquals("abc", value);
+ }
+
+ @Test
+ public void testKvInteger() {
+ spConfig.register("int", 2, "desc");
+ var value = spConfig.getInteger("int");
+ assertEquals(2, value);
+ }
+
+ @Test
+ public void testKvConfigItem() {
+ var configItem = makeConfigItem("config", "value");
+ spConfig.register(configItem);
+ var value = spConfig.getConfigItem("config");
+ assertEquals(configItem, value);
+ }
+
+ @Test
+ public void testUpdateValue() {
+ spConfig.register("stru", "abc", "desc");
+ var value = spConfig.getString("stru");
+ assertEquals("abc", value);
+ spConfig.setString("stru", "abc2");
+ var result = spConfig.getString("stru");
+ assertEquals("abc2", result);
+ }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
new file mode 100644
index 000000000..0a4093c30
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/svcdiscovery/ConsulSpServiceDiscoveryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.svcdiscovery;
+
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConsulSpServiceDiscoveryTest extends AbstractConsulTest {
+
+ private ISpServiceDiscovery serviceDiscovery;
+
+ @Before
+ public void before() {
+ var env = mockEnvironment();
+ this.serviceDiscovery = SpServiceDiscovery
+ .getServiceDiscovery(env);
+ }
+
+ @Test
+ public void testServiceRegistration() {
+
+ serviceDiscovery.registerService(makeRequest(makeServiceTag(), "abc"));
+
+ var endpoints = getEndpoints(false);
+ assertEquals(1, endpoints.size());
+
+ serviceDiscovery.deregisterService("abc");
+
+ endpoints = getEndpoints(false);
+ assertEquals(0, endpoints.size());
+ }
+
+ @Test
+ public void testGetExtensionsServiceGroups() {
+ serviceDiscovery.registerService(makeRequest(List.of(DefaultSpServiceTags.PE, groupTag()), "abc"));
+ serviceDiscovery.registerService(makeRequest(
+ List.of(DefaultSpServiceTags.CORE, groupTag()), "def"));
+
+ var serviceGroups = serviceDiscovery.getExtensionsServiceGroups();
+
+ assertEquals(1, serviceGroups.size());
+ assertTrue(serviceGroups.containsKey("my-group"));
+
+ serviceDiscovery.deregisterService("abc");
+ serviceDiscovery.deregisterService("def");
+ }
+
+ private List<String> getEndpoints(boolean healthy) {
+ return serviceDiscovery.getServiceEndpoints(
+ DefaultSpServiceGroups.EXT,
+ healthy,
+ List.of(makeServiceTag().get(0).asString()));
+ }
+
+ private SpServiceRegistrationRequest makeRequest(List<SpServiceTag> serviceTags,
+ String serviceId) {
+ var req = new SpServiceRegistrationRequest(
+ DefaultSpServiceGroups.EXT,
+ serviceId,
+ "localhost",
+ 80,
+ serviceTags,
+ "/"
+ );
+
+ return req;
+ }
+
+ private SpServiceTag groupTag() {
+ return SpServiceTag.create(SpServiceTagPrefix.SP_GROUP, "my-group");
+ }
+
+ private List<SpServiceTag> makeServiceTag() {
+ return List.of(DefaultSpServiceTags.PE);
+ }
+}
diff --git a/streampipes-integration-tests/src/test/resources/logback-test.xml b/streampipes-integration-tests/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..d47f8f1cd
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<configuration debug="false" scan="true">
+
+ <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
diff --git a/streampipes-sdk/pom.xml b/streampipes-sdk/pom.xml
index 0f7b7aaf3..889aa9e5d 100644
--- a/streampipes-sdk/pom.xml
+++ b/streampipes-sdk/pom.xml
@@ -47,6 +47,10 @@
</dependency>
<!-- External dependencies -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<dependency>
<groupId>com.github.drapostolos</groupId>
<artifactId>type-parser</artifactId>
@@ -71,4 +75,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
index 6c7583df1..4b859f3c6 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpKvManagement.java
@@ -21,8 +21,6 @@ import java.util.Map;
public interface ISpKvManagement {
- <T> T getValueForRoute(String route, Class<T> type);
-
Map<String, String> getKeyValue(String route);
void updateConfig(String key, String entry, boolean password);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
index c3dcc0415..803f4e43b 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.svcdiscovery.api.model;
+import java.util.Objects;
+
public class ConfigItem {
private String key;
@@ -117,4 +119,26 @@ public class ConfigItem {
public void setConfigurationScope(ConfigurationScope configurationScope) {
this.configurationScope = configurationScope;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConfigItem that = (ConfigItem) o;
+ return isPassword == that.isPassword
+ && Objects.equals(key, that.key)
+ && Objects.equals(description, that.description)
+ && Objects.equals(value, that.value)
+ && Objects.equals(valueType, that.valueType)
+ && configurationScope == that.configurationScope;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, description, value, valueType, configurationScope, isPassword);
+ }
}
diff --git a/streampipes-service-discovery-consul/pom.xml b/streampipes-service-discovery-consul/pom.xml
index b1cedc09e..a10cad9ef 100644
--- a/streampipes-service-discovery-consul/pom.xml
+++ b/streampipes-service-discovery-consul/pom.xml
@@ -46,8 +46,13 @@
<!-- third-party dependencies -->
<dependency>
- <groupId>com.orbitz.consul</groupId>
- <artifactId>consul-client</artifactId>
+ <groupId>com.ecwid.consul</groupId>
+ <artifactId>consul-api</artifactId>
+ <version>1.4.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
index aff0f18c3..d89b387ac 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
@@ -17,17 +17,19 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
+import org.apache.streampipes.commons.environment.Environment;
+
+import com.ecwid.consul.v1.ConsulClient;
public abstract class AbstractConsulService {
- protected ConsulHealthServiceManager consul;
+ private final Environment environment;
- public AbstractConsulService() {
- this.consul = ConsulHealthServiceManager.INSTANCE;
+ public AbstractConsulService(Environment environment) {
+ this.environment = environment;
}
- public Consul consulInstance() {
- return this.consul.consulInstance();
+ public ConsulClient consulInstance() {
+ return ConsulProvider.INSTANCE.getConsulInstance(environment);
}
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
index d8d2d642b..cb1710483 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
@@ -17,90 +17,64 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.commons.environment.Environment;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.HealthClient;
-import com.orbitz.consul.cache.ServiceHealthCache;
-import com.orbitz.consul.model.health.ServiceHealth;
-import com.orbitz.consul.option.QueryOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.Check;
+import com.ecwid.consul.v1.health.model.HealthService;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public enum ConsulHealthServiceManager {
+public class ConsulHealthServiceManager extends AbstractConsulService {
- INSTANCE;
-
- private static final Logger LOG = LoggerFactory.getLogger(ConsulHealthServiceManager.class);
private static final int MAX_RETRIES = 3;
- private final Consul client;
- private final Map<String, ServiceHealthCache> serviceHealthCaches;
- ConsulHealthServiceManager() {
- serviceHealthCaches = new HashMap<>();
- client = new ConsulProvider().consulInstance();
- initializeAll();
+ public ConsulHealthServiceManager(Environment environment) {
+ super(environment);
+ initializeCache();
}
- public void initializeAll() {
- initialize(DefaultSpServiceGroups.CORE);
- initialize(DefaultSpServiceGroups.EXT);
- }
+ public void initializeCache() {
- public void initialize(String serviceGroup) {
- HealthClient healthClient = client.healthClient();
- ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, serviceGroup, false, 9, QueryOptions.BLANK);
- svHealth.start();
- try {
- svHealth.awaitInitialized(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- serviceHealthCaches.put(serviceGroup, svHealth);
}
public List<String> getServiceEndpoints(String serviceGroup,
boolean restrictToHealthy,
List<String> filterByTags) {
- List<ServiceHealth> activeServices = findService(serviceGroup, 0);
+ List<HealthService> activeServices = findService(serviceGroup, 0);
return activeServices
.stream()
.filter(service -> allFiltersSupported(service, filterByTags))
.filter(service -> !restrictToHealthy
- || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
+ || service.getChecks().stream().allMatch(check -> check.getStatus() == Check.CheckStatus.PASSING))
.map(this::makeServiceUrl)
.collect(Collectors.toList());
}
- private String makeServiceUrl(ServiceHealth service) {
+ private String makeServiceUrl(HealthService service) {
return service.getService().getAddress() + ":" + service.getService().getPort();
}
- private boolean allFiltersSupported(ServiceHealth service,
+ private boolean allFiltersSupported(HealthService service,
List<String> filterByTags) {
- return service.getService().getTags().containsAll(filterByTags);
+ return new HashSet<>(service.getService().getTags()).containsAll(filterByTags);
}
- private List<ServiceHealth> findService(String serviceGroup, int retryCount) {
-
- if (serviceHealthCaches.containsKey(serviceGroup)
- && serviceHealthCaches.get(serviceGroup).getMap() != null) {
- ServiceHealthCache cache = serviceHealthCaches.get(serviceGroup);
- return cache
- .getMap()
- .values()
- .stream()
- .filter((value) -> value.getService().getService().equals(serviceGroup))
- .collect(Collectors.toList());
- } else {
+ private List<HealthService> findService(String serviceGroup, int retryCount) {
+ HealthServicesRequest request = HealthServicesRequest.newBuilder()
+ .setPassing(true)
+ .setQueryParams(QueryParams.DEFAULT)
+ .build();
+ Response<List<HealthService>> healthyServicesResp = consulInstance().getHealthServices(serviceGroup, request);
+ var healthyServices = healthyServicesResp.getValue();
+ if (healthyServices.size() == 0) {
if (retryCount < MAX_RETRIES) {
try {
retryCount++;
@@ -113,10 +87,10 @@ public enum ConsulHealthServiceManager {
} else {
return Collections.emptyList();
}
+ } else {
+ return healthyServices;
}
}
- public Consul consulInstance() {
- return this.client;
- }
+
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index 69654eaba..cd7282d32 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -18,9 +18,9 @@
package org.apache.streampipes.svcdiscovery.consul;
import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.ConsulClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,47 +29,52 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
-public class ConsulProvider {
+public enum ConsulProvider {
+
+ INSTANCE;
private static final Logger LOG = LoggerFactory.getLogger(ConsulProvider.class);
private static final int CHECK_INTERVAL = 1;
- private final String consulHost;
- private final String consulUrlString;
- private final int consulPort;
+ private ConsulClient consulClient;
+ private boolean initialized = false;
- public ConsulProvider() {
- this.consulHost = getConsulHost();
- this.consulPort = getConsulPort();
- this.consulUrlString = makeConsulUrl();
+ ConsulProvider() {
}
- public Consul consulInstance() {
- boolean connected;
+ public ConsulClient getConsulInstance(Environment environment) {
+ if (!initialized) {
+ createConsulInstance(environment);
+ }
- do {
- LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
- connected = checkConsulAvailable();
+ return consulClient;
+ }
- if (!connected) {
- LOG.info("Retrying in {} second", CHECK_INTERVAL);
- try {
- TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } while (!connected);
+ private void createConsulInstance(Environment environment) {
+ var consulHost = getConsulHost(environment);
+ var consulPort = getConsulPort(environment);
+ var connected = false;
- LOG.info("Successfully connected to Consul");
- return Consul.builder().withUrl(consulUrlString).build();
- }
+ LOG.info("Checking if consul is available on host {} and port {}", consulHost, consulPort);
+ connected = checkConsulAvailable(consulHost, consulPort);
- public String makeConsulUrl() {
- return "http://" + consulHost + ":" + consulPort;
+ if (connected) {
+ LOG.info("Successfully connected to Consul on host {}", consulHost);
+ this.consulClient = new ConsulClient(consulHost, consulPort);
+ this.initialized = true;
+ } else {
+ LOG.info("Retrying in {} second", CHECK_INTERVAL);
+ try {
+ TimeUnit.SECONDS.sleep(CHECK_INTERVAL);
+ createConsulInstance(environment);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
- private boolean checkConsulAvailable() {
+ private boolean checkConsulAvailable(String consulHost,
+ int consulPort) {
try {
InetSocketAddress sa = new InetSocketAddress(consulHost, consulPort);
Socket ss = new Socket();
@@ -83,18 +88,18 @@ public class ConsulProvider {
}
}
- private int getConsulPort() {
- return Envs.SP_CONSUL_PORT.getValueAsIntOrDefault(DefaultEnvValues.CONSUL_PORT_DEFAULT);
+ private int getConsulPort(Environment environment) {
+ return environment.getConsulPort().getValueOrDefault();
}
- private String getConsulHost() {
- if (Envs.SP_CONSUL_LOCATION.exists()) {
- return Envs.SP_CONSUL_LOCATION.getValue();
+ private String getConsulHost(Environment environment) {
+ if (environment.getConsulLocation().exists()) {
+ return environment.getConsulLocation().getValue();
} else {
- if (Envs.SP_DEBUG.getValueAsBooleanOrDefault(false)) {
+ if (environment.getSpDebug().getValueOrReturn(false)) {
return DefaultEnvValues.CONSUL_HOST_LOCAL;
} else {
- return Envs.SP_CONSUL_HOST.getValueOrDefault(DefaultEnvValues.CONSUL_HOST_DEFAULT);
+ return environment.getConsulHost().getValueOrDefault();
}
}
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index 9f3f6ddec..73770eeea 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -18,35 +18,30 @@
package org.apache.streampipes.svcdiscovery.consul;
+import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItemUtils;
import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
+import com.ecwid.consul.v1.ConsulClient;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.Optional;
-
public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
private static final String SLASH = "/";
private final String serviceName;
- private final KeyValueClient kvClient;
-
- // TODO Implement mechanism to update the client when some configuration parameters change in Consul
- private Map<String, Object> configProps;
+ private final ConsulClient kvClient;
- public ConsulSpConfig(String serviceName) {
- Consul consul = consulInstance();
- this.kvClient = consul.keyValueClient();
+ public ConsulSpConfig(String serviceName,
+ Environment environment) {
+ super(environment);
+ this.kvClient = consulInstance();
this.serviceName = serviceName;
}
@@ -82,9 +77,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
@Override
public void registerObject(String key, Object defaultValue, String description) {
- Optional<String> i = kvClient.getValueAsString(addSn(key));
- if (!i.isPresent()) {
- kvClient.putValue(addSn(key), toJson(defaultValue));
+ var i = kvClient.getKVValue(addSn(key));
+ if (i.getValue() == null) {
+ kvClient.setKVValue(addSn(key), toJson(defaultValue));
}
}
@@ -96,28 +91,28 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
@Override
public void register(ConfigItem configItem) {
String key = addSn(configItem.getKey());
- Optional<String> i = kvClient.getValueAsString(key);
+ var i = kvClient.getKVValue(key);
- if (!i.isPresent()) {
+ if (i.getValue() == null) {
// Set the value of environment variable as default
String envVariable = System.getenv(configItem.getKey());
if (envVariable != null) {
configItem.setValue(envVariable);
- kvClient.putValue(key, toJson(configItem));
+ kvClient.setKVValue(key, toJson(configItem));
} else {
- kvClient.putValue(key, toJson(configItem));
+ kvClient.setKVValue(key, toJson(configItem));
}
}
}
- private void register(String key, String defaultValue, String valueType, String description,
- ConfigurationScope configurationScope, boolean isPassword) {
+ private void register(String key,
+ String defaultValue,
+ String valueType,
+ String description,
+ ConfigurationScope configurationScope,
+ boolean isPassword) {
ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
register(configItem);
-
- if (configProps != null) {
- configProps.put(key, getString(key));
- }
}
@Override
@@ -142,10 +137,10 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
@Override
public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
- Optional<String> os = kvClient.getValueAsString(addSn(key));
- if (os.isPresent()) {
+ var os = kvClient.getKVValue(addSn(key));
+ if (os.getValue() != null) {
try {
- return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
+ return JacksonSerializer.getObjectMapper().readValue(os.getValue().getDecodedValue(), clazz);
} catch (JsonProcessingException e) {
LOG.info("Could not deserialize object", e);
return defaultValue;
@@ -157,9 +152,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
@Override
public ConfigItem getConfigItem(String key) {
- Optional<String> os = kvClient.getValueAsString(addSn(key));
+ var os = kvClient.getKVValue(addSn(key)).getValue();
- return fromJson(os.get());
+ return fromJson(os.getDecodedValue());
}
@Override
@@ -179,12 +174,12 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
@Override
public void setString(String key, String value) {
- kvClient.putValue(addSn(key), value);
+ kvClient.setKVValue(addSn(key), value);
}
@Override
public void setObject(String key, Object value) {
- kvClient.putValue(addSn(key), toJson(value));
+ kvClient.setKVValue(addSn(key), toJson(value));
}
private String addSn(String key) {
@@ -202,7 +197,9 @@ public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
}
}
- private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope,
+ private ConfigItem prepareConfigItem(String valueType,
+ String description,
+ ConfigurationScope configurationScope,
boolean password) {
ConfigItem configItem = new ConfigItem();
configItem.setValueType(valueType);
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
similarity index 61%
copy from streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
copy to streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
index aff0f18c3..1020553cd 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/AbstractConsulService.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ServiceCache.java
@@ -15,19 +15,30 @@
* limitations under the License.
*
*/
+
package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
-public abstract class AbstractConsulService {
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
- protected ConsulHealthServiceManager consul;
+public enum ServiceCache {
- public AbstractConsulService() {
- this.consul = ConsulHealthServiceManager.INSTANCE;
- }
+ INSTANCE;
+
+ private LoadingCache<String, List<HealthService>> cache;
+
+ ServiceCache() {
- public Consul consulInstance() {
- return this.consul.consulInstance();
}
+
+
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
index 0138114a5..31247feca 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
@@ -17,15 +17,11 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
-import com.orbitz.consul.model.ConsulResponse;
-import com.orbitz.consul.model.kv.Value;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,47 +29,28 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagement {
+public class SpConsulKvManagement extends AbstractConsulService implements ISpKvManagement {
private static final Logger LOG = LoggerFactory.getLogger(SpConsulKvManagement.class);
private static final String CONSUL_NAMESPACE = "/sp/v1/";
- public <T> T getValueForRoute(String route, Class<T> type) {
- try {
- String entry = getKeyValue(route)
- .values()
- .stream()
- .findFirst()
- .orElse(null);
-
- if (type.equals(Integer.class)) {
- return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
- } else if (type.equals(Boolean.class)) {
- return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
- } else {
- return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
- }
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- throw new IllegalArgumentException("Cannot get entry from Consul");
+ public SpConsulKvManagement(Environment environment) {
+ super(environment);
}
public Map<String, String> getKeyValue(String route) {
- Consul consul = consulInstance();
- KeyValueClient keyValueClient = consul.keyValueClient();
+ var consul = consulInstance();
Map<String, String> keyValues = new HashMap<>();
+ Response<List<GetValue>> consulResponseWithValues = consul.getKVValues(route);
- ConsulResponse<List<Value>> consulResponseWithValues = keyValueClient.getConsulResponseWithValues(route);
-
- if (consulResponseWithValues.getResponse() != null) {
- for (Value value : consulResponseWithValues.getResponse()) {
+ if (consulResponseWithValues.getValue() != null) {
+ for (GetValue value : consulResponseWithValues.getValue()) {
String key = value.getKey();
String v = "";
- if (value.getValueAsString().isPresent()) {
- v = value.getValueAsString().get();
+ if (value.getValue() != null) {
+ v = value.getDecodedValue();
}
keyValues.put(key, v);
}
@@ -82,16 +59,16 @@ public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagem
}
public void updateConfig(String key, String entry, boolean password) {
- Consul consul = consulInstance();
+ var consul = consulInstance();
if (!password) {
LOG.info("Updated config - key:" + key + " value: " + entry);
- consul.keyValueClient().putValue(key, entry);
+ consul.setKVValue(key, entry);
}
}
public void deleteConfig(String key) {
- Consul consul = consulInstance();
+ var consul = consulInstance();
LOG.info("Delete config: {}", key);
- consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
+ consul.deleteKVValue(CONSUL_NAMESPACE + key);
}
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 8156e2545..a00acf575 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
+import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
@@ -24,13 +25,10 @@ import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationReques
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
-import com.orbitz.consul.AgentClient;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.model.agent.ImmutableRegCheck;
-import com.orbitz.consul.model.agent.ImmutableRegistration;
-import com.orbitz.consul.model.agent.Registration;
-import com.orbitz.consul.model.health.HealthCheck;
-import com.orbitz.consul.model.health.Service;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.Check;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.agent.model.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +47,16 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
private static final String COLON = ":";
private static final String HEALTH_CHECK_INTERVAL = "10s";
+ private ConsulHealthServiceManager healthServiceManager;
+
+ public SpConsulServiceDiscovery(Environment environment) {
+ super(environment);
+ this.healthServiceManager = new ConsulHealthServiceManager(environment);
+ }
+
@Override
public void registerService(SpServiceRegistrationRequest req) {
- consulInstance().agentClient().register((createRegistrationBody(req)));
+ consulInstance().agentServiceRegister(createRegistrationBody(req));
LOG.info("Successfully registered service at Consul: " + req.getSvcId());
}
@@ -75,30 +80,32 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
@Override
public List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy, List<String> filterByTags) {
- return ConsulHealthServiceManager.INSTANCE.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
+ return healthServiceManager.getServiceEndpoints(svcGroup, restrictToHealthy, filterByTags);
}
@Override
public Map<String, String> getExtensionsServiceGroups() {
LOG.info("Load pipeline element service status");
- Consul consul = consulInstance();
- AgentClient agent = consul.agentClient();
+ var consul = consulInstance();
- Map<String, Service> services = consul.agentClient().getServices();
- Map<String, HealthCheck> checks = agent.getChecks();
+ Response<Map<String, Service>> servicesResp = consul.getAgentServices();
+ Response<Map<String, Check>> checksResp = consul.getAgentChecks();
Map<String, String> peSvcs = new HashMap<>();
-
- for (Map.Entry<String, Service> entry : services.entrySet()) {
- if (hasExtensionsTag(entry.getValue().getTags())) {
- String serviceId = entry.getValue().getId();
- String serviceStatus = "critical";
- if (checks.containsKey("service:" + entry.getKey())) {
- serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
+ if (servicesResp.getValue() != null) {
+ var services = servicesResp.getValue();
+ var checks = checksResp.getValue();
+ for (Map.Entry<String, Service> entry : services.entrySet()) {
+ if (hasExtensionsTag(entry.getValue().getTags())) {
+ String serviceId = entry.getValue().getId();
+ String serviceStatus = "critical";
+ if (checks.containsKey("service:" + entry.getKey())) {
+ serviceStatus = checks.get("service:" + entry.getKey()).getStatus().name();
+ }
+ LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
+ String serviceGroup = extractServiceGroup(entry.getValue().getTags());
+ peSvcs.put(serviceGroup, serviceStatus);
}
- LOG.info("Service id: " + serviceId + " service status: " + serviceStatus);
- String serviceGroup = extractServiceGroup(entry.getValue().getTags());
- peSvcs.put(serviceGroup, serviceStatus);
}
}
return peSvcs;
@@ -117,25 +124,33 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
@Override
public void deregisterService(String svcId) {
- Consul consul = consulInstance();
+ var consul = consulInstance();
LOG.info("Deregister service: " + svcId);
- consul.agentClient().deregister(svcId);
+ consul.agentServiceDeregister(svcId);
+ }
+
+ private NewService createRegistrationBody(SpServiceRegistrationRequest req) {
+ var service = new NewService();
+ service.setId(req.getSvcId());
+ service.setName(req.getSvcGroup());
+ service.setPort(req.getPort());
+ service.setAddress(HTTP_PROTOCOL + req.getHost());
+ service.setCheck(createServiceCheck(req));
+
+ service.setTags(asString(req.getTags()));
+ service.setEnableTagOverride(true);
+
+ return service;
}
- private Registration createRegistrationBody(SpServiceRegistrationRequest req) {
- return ImmutableRegistration.builder()
- .id(req.getSvcId())
- .name(req.getSvcGroup())
- .port(req.getPort())
- .address(HTTP_PROTOCOL + req.getHost())
- .check(ImmutableRegCheck.builder()
- .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
- .interval(HEALTH_CHECK_INTERVAL)
- .deregisterCriticalServiceAfter("120s")
- .status("passing")
- .build())
- .tags(asString(req.getTags()))
- .enableTagOverride(true)
- .build();
+ private NewService.Check createServiceCheck(SpServiceRegistrationRequest req) {
+ var serviceCheck = new NewService.Check();
+
+ serviceCheck.setHttp(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath());
+ serviceCheck.setInterval(HEALTH_CHECK_INTERVAL);
+ serviceCheck.setDeregisterCriticalServiceAfter("120s");
+ serviceCheck.setStatus("passing");
+
+ return serviceCheck;
}
}
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
index 8f836b077..cead1334b 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.svcdiscovery;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
@@ -27,15 +29,28 @@ import org.apache.streampipes.svcdiscovery.consul.SpConsulServiceDiscovery;
public class SpServiceDiscovery {
public static ISpServiceDiscovery getServiceDiscovery() {
- return new SpConsulServiceDiscovery();
+ return new SpConsulServiceDiscovery(Environments.getEnvironment());
+ }
+
+ public static ISpServiceDiscovery getServiceDiscovery(Environment environment) {
+ return new SpConsulServiceDiscovery(environment);
}
public static ISpKvManagement getKeyValueStore() {
- return new SpConsulKvManagement();
+ return new SpConsulKvManagement(Environments.getEnvironment());
+ }
+
+ public static ISpKvManagement getKeyValueStore(Environment environment) {
+ return new SpConsulKvManagement(environment);
}
public static SpConfig getSpConfig(String serviceGroup) {
- return new ConsulSpConfig(serviceGroup);
+ return getSpConfig(serviceGroup, Environments.getEnvironment());
+ }
+
+ public static SpConfig getSpConfig(String serviceGroup,
+ Environment environment) {
+ return new ConsulSpConfig(serviceGroup, environment);
}
}
diff --git a/streampipes-storage-couchdb/pom.xml b/streampipes-storage-couchdb/pom.xml
index 4a0c28cdb..f0346cd60 100644
--- a/streampipes-storage-couchdb/pom.xml
+++ b/streampipes-storage-couchdb/pom.xml
@@ -56,6 +56,10 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<dependency>
<groupId>org.lightcouch</groupId>
<artifactId>lightcouch</artifactId>