You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/04 18:23:06 UTC
[streampipes] branch dev updated: add-checkstyle-streampipes-service (#845)
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 19d4f6ca1 add-checkstyle-streampipes-service (#845)
19d4f6ca1 is described below
commit 19d4f6ca1534bc1067e6f684514d89140ed19e2e
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Sun Dec 4 19:23:01 2022 +0100
add-checkstyle-streampipes-service (#845)
* add checkstyle to streampipes-service-base
* add checkstyle to streampipes-service-discovery
* add checkstyle to streampipes-service-discovery-api
* add checkstyle to streampipes-service-discovery-consul
* add checkstyle to streampipes-service-extension-base
---
streampipes-service-base/pom.xml | 24 +-
.../service/base/BaseNetworkingConfig.java | 12 +-
.../service/base/StreamPipesServiceBase.java | 22 +-
.../service/base/rest/BaseResourceConfig.java | 2 +-
.../security/UnauthorizedRequestEntryPoint.java | 14 +-
.../src/main/resources/logback-spring.xml | 2 +-
streampipes-service-discovery-api/pom.xml | 25 +-
.../svcdiscovery/api/ISpServiceDiscovery.java | 12 +-
.../streampipes/svcdiscovery/api/SpConfig.java | 38 +--
.../svcdiscovery/api/model/ConfigItem.java | 11 +-
.../api/model/DefaultSpServiceTags.java | 3 +-
.../svcdiscovery/api/model/PeConfig.java | 72 ++--
.../api/model/SpServiceRegistrationRequest.java | 28 +-
.../svcdiscovery/api/model/SpServiceTag.java | 11 +-
.../api/model/SpServiceUrlProvider.java | 34 +-
streampipes-service-discovery-consul/pom.xml | 25 +-
.../consul/ConsulHealthServiceManager.java | 29 +-
.../svcdiscovery/consul/ConsulProvider.java | 5 +-
.../svcdiscovery/consul/ConsulSpConfig.java | 376 +++++++++++----------
.../svcdiscovery/consul/SpConsulKvManagement.java | 15 +-
.../consul/SpConsulServiceDiscovery.java | 45 +--
streampipes-service-discovery/pom.xml | 26 +-
.../svcdiscovery/SpServiceDiscovery.java | 2 +-
streampipes-service-extensions-base/pom.xml | 25 +-
.../base/StreamPipesExtensionsServiceBase.java | 111 +++---
.../service/extensions/base/WebSecurityConfig.java | 37 +-
...StreamPipesClientRuntimeConnectionResolver.java | 13 +-
.../base/security/TokenAuthenticationFilter.java | 6 +-
.../base/security/UnauthenticatedInterfaces.java | 12 +-
29 files changed, 587 insertions(+), 450 deletions(-)
diff --git a/streampipes-service-base/pom.xml b/streampipes-service-base/pom.xml
index 4c5811a3a..eda5680f4 100644
--- a/streampipes-service-base/pom.xml
+++ b/streampipes-service-base/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -113,4 +114,25 @@
<artifactId>hibernate-validator</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/BaseNetworkingConfig.java b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/BaseNetworkingConfig.java
index bfcd1048f..5cec0724a 100644
--- a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/BaseNetworkingConfig.java
+++ b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/BaseNetworkingConfig.java
@@ -26,6 +26,12 @@ public class BaseNetworkingConfig {
private final String host;
private final Integer port;
+ public BaseNetworkingConfig(String host,
+ Integer port) {
+ this.host = host;
+ this.port = port;
+ }
+
public static BaseNetworkingConfig defaultResolution(Integer defaultPort) throws UnknownHostException {
String host = Networking.getHostname();
Integer port = Networking.getPort(defaultPort);
@@ -33,12 +39,6 @@ public class BaseNetworkingConfig {
return new BaseNetworkingConfig(host, port);
}
- public BaseNetworkingConfig(String host,
- Integer port) {
- this.host = host;
- this.port = port;
- }
-
public String getHost() {
return host;
}
diff --git a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/StreamPipesServiceBase.java b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/StreamPipesServiceBase.java
index 593e14258..f733921a2 100644
--- a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/StreamPipesServiceBase.java
+++ b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/StreamPipesServiceBase.java
@@ -17,10 +17,11 @@
*/
package org.apache.streampipes.service.base;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+
+import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -31,9 +32,8 @@ import java.util.List;
public abstract class StreamPipesServiceBase {
- private static final Logger LOG = LoggerFactory.getLogger(StreamPipesServiceBase.class);
-
public static final String AUTO_GENERATED_SERVICE_ID = RandomStringUtils.randomAlphanumeric(6);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamPipesServiceBase.class);
protected void startStreamPipesService(Class<?> serviceClass,
String serviceGroup,
@@ -54,16 +54,16 @@ public abstract class StreamPipesServiceBase {
String serviceId,
BaseNetworkingConfig networkingConfig) {
SpServiceRegistrationRequest req = SpServiceRegistrationRequest.from(
- serviceGroup,
- serviceId,
- networkingConfig.getHost(),
- networkingConfig.getPort(),
- getServiceTags(),
- getHealthCheckPath());
+ serviceGroup,
+ serviceId,
+ networkingConfig.getHost(),
+ networkingConfig.getPort(),
+ getServiceTags(),
+ getHealthCheckPath());
SpServiceDiscovery
- .getServiceDiscovery()
- .registerService(req);
+ .getServiceDiscovery()
+ .registerService(req);
}
protected abstract List<SpServiceTag> getServiceTags();
diff --git a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/rest/BaseResourceConfig.java b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/rest/BaseResourceConfig.java
index 7e5d1d77d..3eb8c9279 100644
--- a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/rest/BaseResourceConfig.java
+++ b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/rest/BaseResourceConfig.java
@@ -27,7 +27,7 @@ public abstract class BaseResourceConfig extends ResourceConfig {
public BaseResourceConfig() {
property(ServletProperties.FILTER_FORWARD_ON_404, true);
getClassesToRegister()
- .forEach(set -> set.forEach(this::register));
+ .forEach(set -> set.forEach(this::register));
register(ServiceHealthResource.class);
}
diff --git a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/security/UnauthorizedRequestEntryPoint.java b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/security/UnauthorizedRequestEntryPoint.java
index 91528c8ee..50cb7eafd 100644
--- a/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/security/UnauthorizedRequestEntryPoint.java
+++ b/streampipes-service-base/src/main/java/org/apache/streampipes/service/base/security/UnauthorizedRequestEntryPoint.java
@@ -25,16 +25,18 @@ import org.springframework.security.web.AuthenticationEntryPoint;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import java.io.IOException;
public class UnauthorizedRequestEntryPoint implements AuthenticationEntryPoint {
- private static final Logger LOG = LoggerFactory.getLogger(UnauthorizedRequestEntryPoint.class);
+ private static final Logger LOG = LoggerFactory.getLogger(UnauthorizedRequestEntryPoint.class);
- @Override
- public void commence(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, AuthenticationException e) throws IOException {
- LOG.error("Unauthorized request to {}", httpServletRequest.getPathInfo());
+ @Override
+ public void commence(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
+ AuthenticationException e) throws IOException {
+ LOG.error("Unauthorized request to {}", httpServletRequest.getPathInfo());
- httpServletResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, e.getLocalizedMessage());
- }
+ httpServletResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, e.getLocalizedMessage());
+ }
}
diff --git a/streampipes-service-base/src/main/resources/logback-spring.xml b/streampipes-service-base/src/main/resources/logback-spring.xml
index b2e2e8a1b..1331d020d 100644
--- a/streampipes-service-base/src/main/resources/logback-spring.xml
+++ b/streampipes-service-base/src/main/resources/logback-spring.xml
@@ -33,6 +33,6 @@
<root level="INFO">
- <appender-ref ref="CONSOLE" />
+ <appender-ref ref="CONSOLE"/>
</root>
</configuration>
diff --git a/streampipes-service-discovery-api/pom.xml b/streampipes-service-discovery-api/pom.xml
index fd73b1de5..a2ebacb8d 100644
--- a/streampipes-service-discovery-api/pom.xml
+++ b/streampipes-service-discovery-api/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -27,5 +28,25 @@
<artifactId>streampipes-service-discovery-api</artifactId>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
index c9159bd9f..729739229 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
@@ -27,7 +27,7 @@ public interface ISpServiceDiscovery {
/**
* Register service.
*
- * @param serviceRegistrationRequest the service registration request
+ * @param serviceRegistrationRequest the service registration request
*/
void registerService(SpServiceRegistrationRequest serviceRegistrationRequest);
@@ -48,10 +48,10 @@ public interface ISpServiceDiscovery {
/**
* Get service endpoints
*
- * @param svcGroup service group for registered service
- * @param restrictToHealthy retrieve healthy or all registered services for a service group
- * @param filterByTags filter param to filter list of registered services
- * @return list of services
+ * @param svcGroup service group for registered service
+ * @param restrictToHealthy retrieve healthy or all registered services for a service group
+ * @param filterByTags filter param to filter list of registered services
+ * @return list of services
*/
List<String> getServiceEndpoints(String svcGroup,
boolean restrictToHealthy,
@@ -67,7 +67,7 @@ public interface ISpServiceDiscovery {
/**
* Deregister registered service endpoint in Consul
*
- * @param svcId service id of endpoint to be deregistered
+ * @param svcId service id of endpoint to be deregistered
*/
void deregisterService(String svcId);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java
index 892302fd2..caab3e6ff 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java
@@ -23,42 +23,42 @@ import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
public interface SpConfig {
- <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope);
+ <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope);
- void register(String key, boolean defaultValue, String description);
+ void register(String key, boolean defaultValue, String description);
- void register(String key, int defaultValue, String description);
+ void register(String key, int defaultValue, String description);
- void register(String key, double defaultValue, String description);
+ void register(String key, double defaultValue, String description);
- void register(String key, String defaultValue, String description);
+ void register(String key, String defaultValue, String description);
- void register(ConfigItem configItem);
+ void register(ConfigItem configItem);
- void registerObject(String key, Object defaultValue, String description);
+ void registerObject(String key, Object defaultValue, String description);
- void registerPassword(String key, String defaultValue, String description);
+ void registerPassword(String key, String defaultValue, String description);
- boolean getBoolean(String key);
+ boolean getBoolean(String key);
- int getInteger(String key);
+ int getInteger(String key);
- double getDouble(String key);
+ double getDouble(String key);
- String getString(String key);
+ String getString(String key);
- <T> T getObject(String key, Class<T> clazz, T defaultValue);
+ <T> T getObject(String key, Class<T> clazz, T defaultValue);
- ConfigItem getConfigItem(String key);
+ ConfigItem getConfigItem(String key);
- void setBoolean(String key, Boolean value);
+ void setBoolean(String key, Boolean value);
- void setInteger(String key, int value);
+ void setInteger(String key, int value);
- void setDouble(String key, double value);
+ void setDouble(String key, double value);
- void setString(String key, String value);
+ void setString(String key, String value);
- void setObject(String key, Object value);
+ void setObject(String key, Object value);
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
index b44bb9025..c3dcc0415 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/ConfigItem.java
@@ -27,6 +27,10 @@ public class ConfigItem {
private ConfigurationScope configurationScope;
private boolean isPassword;
+ public ConfigItem() {
+ setPassword(false);
+ }
+
public static <T> ConfigItem from(String key,
T defaultValue,
String description) {
@@ -45,7 +49,8 @@ public class ConfigItem {
String description,
ConfigurationScope configurationScope,
boolean isPassword) {
- return from(key, defaultValue, description, ConfigItemUtils.getValueType(defaultValue), configurationScope, isPassword);
+ return from(key, defaultValue, description, ConfigItemUtils.getValueType(defaultValue), configurationScope,
+ isPassword);
}
public static <T> ConfigItem from(String key,
@@ -65,10 +70,6 @@ public class ConfigItem {
return configItem;
}
- public ConfigItem() {
- setPassword(false);
- }
-
public String getKey() {
return key;
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
index fc4f980f3..289211c87 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
@@ -23,5 +23,6 @@ public class DefaultSpServiceTags {
public static final SpServiceTag PE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "pe");
public static final SpServiceTag CONNECT_MASTER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-master");
public static final SpServiceTag CONNECT_WORKER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-worker");
- public static final SpServiceTag STREAMPIPES_CLIENT = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "streampipes-client");
+ public static final SpServiceTag STREAMPIPES_CLIENT =
+ SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "streampipes-client");
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/PeConfig.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/PeConfig.java
index c215a1cd3..d0053e8bf 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/PeConfig.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/PeConfig.java
@@ -23,40 +23,40 @@ import java.util.Map;
public class PeConfig {
- String mainKey;
- String name;
- List<ConfigItem> configs;
- Map<String, String> meta;
-
- public List<ConfigItem> getConfigs() {
- return configs;
- }
-
- public void setConfigs(List<ConfigItem> configs) {
- this.configs = configs;
- }
-
- public Map<String, String> getMeta() {
- return meta;
- }
-
- public void setMeta(Map<String, String> meta) {
- this.meta = meta;
- }
-
- public String getMainKey() {
- return mainKey;
- }
-
- public void setMainKey(String mainKey) {
- this.mainKey = mainKey;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
+ String mainKey;
+ String name;
+ List<ConfigItem> configs;
+ Map<String, String> meta;
+
+ public List<ConfigItem> getConfigs() {
+ return configs;
+ }
+
+ public void setConfigs(List<ConfigItem> configs) {
+ this.configs = configs;
+ }
+
+ public Map<String, String> getMeta() {
+ return meta;
+ }
+
+ public void setMeta(Map<String, String> meta) {
+ this.meta = meta;
+ }
+
+ public String getMainKey() {
+ return mainKey;
+ }
+
+ public void setMainKey(String mainKey) {
+ this.mainKey = mainKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
index 4926cdd99..0b0e6c0d2 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
@@ -28,6 +28,20 @@ public class SpServiceRegistrationRequest {
private List<SpServiceTag> tags;
private String healthCheckPath;
+ public SpServiceRegistrationRequest(String svcGroup,
+ String svcId,
+ String host,
+ int port,
+ List<SpServiceTag> tags,
+ String healthCheckPath) {
+ this.svcGroup = svcGroup;
+ this.svcId = svcId;
+ this.host = host;
+ this.port = port;
+ this.tags = tags;
+ this.healthCheckPath = healthCheckPath;
+ }
+
public static SpServiceRegistrationRequest from(String svcGroup,
String svcId,
String host,
@@ -45,20 +59,6 @@ public class SpServiceRegistrationRequest {
return new SpServiceRegistrationRequest(svcGroup, svcId, host, port, tags, healthCheckPath);
}
- public SpServiceRegistrationRequest(String svcGroup,
- String svcId,
- String host,
- int port,
- List<SpServiceTag> tags,
- String healthCheckPath) {
- this.svcGroup = svcGroup;
- this.svcId = svcId;
- this.host = host;
- this.port = port;
- this.tags = tags;
- this.healthCheckPath = healthCheckPath;
- }
-
public String getSvcGroup() {
return svcGroup;
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
index e003366c7..baabf90f3 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTag.java
@@ -19,21 +19,20 @@ package org.apache.streampipes.svcdiscovery.api.model;
public class SpServiceTag {
+ private static final String COLON = ":";
private final SpServiceTagPrefix prefix;
private final String value;
- private static final String COLON = ":";
+ private SpServiceTag(SpServiceTagPrefix prefix, String value) {
+ this.prefix = prefix;
+ this.value = value;
+ }
public static SpServiceTag create(SpServiceTagPrefix prefix,
String value) {
return new SpServiceTag(prefix, value);
}
- private SpServiceTag(SpServiceTagPrefix prefix, String value) {
- this.prefix = prefix;
- this.value = value;
- }
-
public String asString() {
return prefix.asString() + COLON + value;
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
index fb3c3d5e6..1e02adf14 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
@@ -25,8 +25,8 @@ public enum SpServiceUrlProvider {
DATA_SET(SpServicePathPrefix.DATA_SET, SpServiceTagPrefix.DATA_SET),
ADAPTER(SpServicePathPrefix.ADAPTER, SpServiceTagPrefix.ADAPTER);
- private final String HTTP = "http://";
- private final String SLASH = "/";
+ private final String http = "http://";
+ private final String slash = "/";
private final String prefix;
private final SpServiceTagPrefix serviceTagPrefix;
@@ -44,21 +44,21 @@ public enum SpServiceUrlProvider {
public String getInvocationUrl(String host,
Integer port,
String appId) {
- return HTTP
- + host
- + ":"
- + port
- + SLASH
- + this.prefix
- + SLASH + appId;
+ return http
+ + host
+ + ":"
+ + port
+ + slash
+ + this.prefix
+ + slash + appId;
}
public String getInvocationUrl(String baseUrl, String appId) {
return baseUrl
- + SLASH
- + this.prefix
- + SLASH
- + appId;
+ + slash
+ + this.prefix
+ + slash
+ + appId;
}
public String getDetachUrl(String host,
@@ -66,14 +66,14 @@ public enum SpServiceUrlProvider {
String appId,
String invocationId) {
return getInvocationUrl(host, port, appId)
- + SLASH
- + invocationId;
+ + slash
+ + invocationId;
}
public String getDetachUrl(String baseUrl, String appId, String invocationId) {
return getInvocationUrl(baseUrl, appId)
- + SLASH
- + invocationId;
+ + slash
+ + invocationId;
}
public SpServiceTagPrefix getServiceTagPrefix() {
diff --git a/streampipes-service-discovery-consul/pom.xml b/streampipes-service-discovery-consul/pom.xml
index 2d79d0eb0..a2fef9f59 100644
--- a/streampipes-service-discovery-consul/pom.xml
+++ b/streampipes-service-discovery-consul/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -54,4 +55,26 @@
<artifactId>fluent-hc</artifactId>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
index c289ec2ef..d8d2d642b 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulHealthServiceManager.java
@@ -17,12 +17,13 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+
import com.orbitz.consul.Consul;
import com.orbitz.consul.HealthClient;
import com.orbitz.consul.cache.ServiceHealthCache;
import com.orbitz.consul.model.health.ServiceHealth;
import com.orbitz.consul.option.QueryOptions;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,10 +39,9 @@ public enum ConsulHealthServiceManager {
INSTANCE;
private static final Logger LOG = LoggerFactory.getLogger(ConsulHealthServiceManager.class);
-
+ private static final int MAX_RETRIES = 3;
private final Consul client;
private final Map<String, ServiceHealthCache> serviceHealthCaches;
- private final int MAX_RETRIES = 3;
ConsulHealthServiceManager() {
serviceHealthCaches = new HashMap<>();
@@ -72,11 +72,12 @@ public enum ConsulHealthServiceManager {
List<ServiceHealth> activeServices = findService(serviceGroup, 0);
return activeServices
- .stream()
- .filter(service -> allFiltersSupported(service, filterByTags))
- .filter(service -> !restrictToHealthy || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
- .map(this::makeServiceUrl)
- .collect(Collectors.toList());
+ .stream()
+ .filter(service -> allFiltersSupported(service, filterByTags))
+ .filter(service -> !restrictToHealthy
+ || service.getChecks().stream().allMatch(check -> check.getStatus().equals("passing")))
+ .map(this::makeServiceUrl)
+ .collect(Collectors.toList());
}
private String makeServiceUrl(ServiceHealth service) {
@@ -91,14 +92,14 @@ public enum ConsulHealthServiceManager {
private List<ServiceHealth> findService(String serviceGroup, int retryCount) {
if (serviceHealthCaches.containsKey(serviceGroup)
- && serviceHealthCaches.get(serviceGroup).getMap() != null) {
+ && serviceHealthCaches.get(serviceGroup).getMap() != null) {
ServiceHealthCache cache = serviceHealthCaches.get(serviceGroup);
return cache
- .getMap()
- .values()
- .stream()
- .filter((value) -> value.getService().getService().equals(serviceGroup))
- .collect(Collectors.toList());
+ .getMap()
+ .values()
+ .stream()
+ .filter((value) -> value.getService().getService().equals(serviceGroup))
+ .collect(Collectors.toList());
} else {
if (retryCount < MAX_RETRIES) {
try {
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index 76e5c7b2d..49d860555 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -17,8 +17,9 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
-import com.orbitz.consul.Consul;
import org.apache.streampipes.commons.constants.Envs;
+
+import com.orbitz.consul.Consul;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class ConsulProvider {
e.printStackTrace();
}
}
- } while(!connected);
+ } while (!connected);
LOG.info("Successfully connected to Consul");
return Consul.builder().withUrl(consulURL()).build();
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index 0792e8f5c..9f3f6ddec 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -18,204 +18,208 @@
package org.apache.streampipes.svcdiscovery.consul;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.orbitz.consul.Consul;
-import com.orbitz.consul.KeyValueClient;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItemUtils;
import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
public class ConsulSpConfig extends AbstractConsulService implements SpConfig {
- private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
-
- private static final String SLASH = "/";
- public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
-
- private final String serviceName;
- private final KeyValueClient kvClient;
-
- // TODO Implement mechanism to update the client when some configuration parameters change in Consul
- private Map<String, Object> configProps;
-
- public ConsulSpConfig(String serviceName) {
- Consul consul = consulInstance();
- this.kvClient = consul.keyValueClient();
- this.serviceName = serviceName;
- }
-
- @Override
- public <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope) {
- register(key, String.valueOf(defaultValue), ConfigItemUtils.getValueType(defaultValue), description, configurationScope, false);
- }
-
- @Override
- public void register(String key, boolean defaultValue, String description) {
- register(key, Boolean.toString(defaultValue), "xs:boolean", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
- }
-
- @Override
- public void register(String key, int defaultValue, String description) {
- register(key, Integer.toString(defaultValue), "xs:integer", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
- }
-
- @Override
- public void register(String key, double defaultValue, String description) {
- register(key, Double.toString(defaultValue), "xs:double", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
-
- }
-
- @Override
- public void register(String key, String defaultValue, String description) {
- register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
- }
-
- @Override
- public void registerObject(String key, Object defaultValue, String description) {
- Optional<String> i = kvClient.getValueAsString(addSn(key));
- if (!i.isPresent()) {
- kvClient.putValue(addSn(key), toJson(defaultValue));
- }
- }
-
- @Override
- public void registerPassword(String key, String defaultValue, String description) {
- register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, true);
- }
-
- @Override
- public void register(ConfigItem configItem) {
- String key = addSn(configItem.getKey());
- Optional<String> i = kvClient.getValueAsString(key);
-
- if (!i.isPresent()) {
- // Set the value of environment variable as default
- String envVariable = System.getenv(configItem.getKey());
- if (envVariable != null) {
- configItem.setValue(envVariable);
- kvClient.putValue(key, toJson(configItem));
- } else {
- kvClient.putValue(key, toJson(configItem));
- }
- }
- }
-
- private void register(String key, String defaultValue, String valueType, String description, ConfigurationScope configurationScope, boolean isPassword) {
- ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
- register(configItem);
-
- if (configProps != null) {
- configProps.put(key, getString(key));
- }
- }
-
- @Override
- public boolean getBoolean(String key) {
- return Boolean.parseBoolean(getString(key));
- }
-
- @Override
- public int getInteger(String key) {
- return Integer.parseInt(getString(key));
- }
-
- @Override
- public double getDouble(String key) {
- return Double.parseDouble(getString(key));
- }
-
- @Override
- public String getString(String key) {
- return getConfigItem(key).getValue();
- }
-
- @Override
- public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
- Optional<String> os = kvClient.getValueAsString(addSn(key));
- if (os.isPresent()) {
- try {
- return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
- } catch (JsonProcessingException e) {
- LOG.info("Could not deserialize object", e);
- return defaultValue;
- }
- } else {
- return defaultValue;
- }
- }
-
- @Override
- public ConfigItem getConfigItem(String key) {
- Optional<String> os = kvClient.getValueAsString(addSn(key));
-
- return fromJson(os.get());
- }
-
- @Override
- public void setBoolean(String key, Boolean value) {
- setString(key, value.toString());
- }
-
- @Override
- public void setInteger(String key, int value) {
- setString(key, String.valueOf(value));
- }
-
- @Override
- public void setDouble(String key, double value) {
- setString(key, String.valueOf(value));
- }
-
- @Override
- public void setString(String key, String value) {
- kvClient.putValue(addSn(key), value);
- }
-
- @Override
- public void setObject(String key, Object value) {
- kvClient.putValue(addSn(key), toJson(value));
- }
-
- private String addSn(String key) {
- return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
- }
-
- private ConfigItem fromJson(String content) {
- try {
- return JacksonSerializer.getObjectMapper().readValue(content, ConfigItem.class);
- } catch (Exception e) {
- // if old config is used, this is a fallback
- ConfigItem configItem = new ConfigItem();
- configItem.setValue(content);
- return configItem;
- }
- }
-
- private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope, boolean password) {
- ConfigItem configItem = new ConfigItem();
- configItem.setValueType(valueType);
- configItem.setDescription(description);
- configItem.setPassword(password);
- configItem.setConfigurationScope(configurationScope);
-
- return configItem;
- }
-
- private String toJson(Object object) {
- try {
- return JacksonSerializer.getObjectMapper().writeValueAsString(object);
- } catch (JsonProcessingException e) {
- LOG.info("Could not serialize object to JSON", e);
- return "";
- }
- }
+ public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
+ private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class);
+ private static final String SLASH = "/";
+ private final String serviceName;
+ private final KeyValueClient kvClient;
+
+ // TODO Implement mechanism to update the client when some configuration parameters change in Consul
+ private Map<String, Object> configProps;
+
+ public ConsulSpConfig(String serviceName) {
+ Consul consul = consulInstance();
+ this.kvClient = consul.keyValueClient();
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public <T> void register(String key, T defaultValue, String description, ConfigurationScope configurationScope) {
+ register(key, String.valueOf(defaultValue), ConfigItemUtils.getValueType(defaultValue), description,
+ configurationScope, false);
+ }
+
+ @Override
+ public void register(String key, boolean defaultValue, String description) {
+ register(key, Boolean.toString(defaultValue), "xs:boolean", description,
+ ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+ }
+
+ @Override
+ public void register(String key, int defaultValue, String description) {
+ register(key, Integer.toString(defaultValue), "xs:integer", description,
+ ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+ }
+
+ @Override
+ public void register(String key, double defaultValue, String description) {
+ register(key, Double.toString(defaultValue), "xs:double", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG,
+ false);
+
+ }
+
+ @Override
+ public void register(String key, String defaultValue, String description) {
+ register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, false);
+ }
+
+ @Override
+ public void registerObject(String key, Object defaultValue, String description) {
+ Optional<String> i = kvClient.getValueAsString(addSn(key));
+ if (!i.isPresent()) {
+ kvClient.putValue(addSn(key), toJson(defaultValue));
+ }
+ }
+
+ @Override
+ public void registerPassword(String key, String defaultValue, String description) {
+ register(key, defaultValue, "xs:string", description, ConfigurationScope.CONTAINER_STARTUP_CONFIG, true);
+ }
+
+ @Override
+ public void register(ConfigItem configItem) {
+ String key = addSn(configItem.getKey());
+ Optional<String> i = kvClient.getValueAsString(key);
+
+ if (!i.isPresent()) {
+ // Set the value of environment variable as default
+ String envVariable = System.getenv(configItem.getKey());
+ if (envVariable != null) {
+ configItem.setValue(envVariable);
+ kvClient.putValue(key, toJson(configItem));
+ } else {
+ kvClient.putValue(key, toJson(configItem));
+ }
+ }
+ }
+
+ private void register(String key, String defaultValue, String valueType, String description,
+ ConfigurationScope configurationScope, boolean isPassword) {
+ ConfigItem configItem = ConfigItem.from(key, defaultValue, description, valueType, configurationScope, isPassword);
+ register(configItem);
+
+ if (configProps != null) {
+ configProps.put(key, getString(key));
+ }
+ }
+
+ @Override
+ public boolean getBoolean(String key) {
+ return Boolean.parseBoolean(getString(key));
+ }
+
+ @Override
+ public int getInteger(String key) {
+ return Integer.parseInt(getString(key));
+ }
+
+ @Override
+ public double getDouble(String key) {
+ return Double.parseDouble(getString(key));
+ }
+
+ @Override
+ public String getString(String key) {
+ return getConfigItem(key).getValue();
+ }
+
+ @Override
+ public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
+ Optional<String> os = kvClient.getValueAsString(addSn(key));
+ if (os.isPresent()) {
+ try {
+ return JacksonSerializer.getObjectMapper().readValue(os.get(), clazz);
+ } catch (JsonProcessingException e) {
+ LOG.info("Could not deserialize object", e);
+ return defaultValue;
+ }
+ } else {
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public ConfigItem getConfigItem(String key) {
+ Optional<String> os = kvClient.getValueAsString(addSn(key));
+
+ return fromJson(os.get());
+ }
+
+ @Override
+ public void setBoolean(String key, Boolean value) {
+ setString(key, value.toString());
+ }
+
+ @Override
+ public void setInteger(String key, int value) {
+ setString(key, String.valueOf(value));
+ }
+
+ @Override
+ public void setDouble(String key, double value) {
+ setString(key, String.valueOf(value));
+ }
+
+ @Override
+ public void setString(String key, String value) {
+ kvClient.putValue(addSn(key), value);
+ }
+
+ @Override
+ public void setObject(String key, Object value) {
+ kvClient.putValue(addSn(key), toJson(value));
+ }
+
+ private String addSn(String key) {
+ return SERVICE_ROUTE_PREFIX + serviceName + SLASH + key;
+ }
+
+ private ConfigItem fromJson(String content) {
+ try {
+ return JacksonSerializer.getObjectMapper().readValue(content, ConfigItem.class);
+ } catch (Exception e) {
+ // if old config is used, this is a fallback
+ ConfigItem configItem = new ConfigItem();
+ configItem.setValue(content);
+ return configItem;
+ }
+ }
+
+ private ConfigItem prepareConfigItem(String valueType, String description, ConfigurationScope configurationScope,
+ boolean password) {
+ ConfigItem configItem = new ConfigItem();
+ configItem.setValueType(valueType);
+ configItem.setDescription(description);
+ configItem.setPassword(password);
+ configItem.setConfigurationScope(configurationScope);
+
+ return configItem;
+ }
+
+ private String toJson(Object object) {
+ try {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ LOG.info("Could not serialize object to JSON", e);
+ return "";
+ }
+ }
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
index d54277c8a..0138114a5 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulKvManagement.java
@@ -17,14 +17,15 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.ConsulResponse;
import com.orbitz.consul.model.kv.Value;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.svcdiscovery.api.ISpKvManagement;
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +42,10 @@ public class SpConsulKvManagement extends ConsulProvider implements ISpKvManagem
public <T> T getValueForRoute(String route, Class<T> type) {
try {
String entry = getKeyValue(route)
- .values()
- .stream()
- .findFirst()
- .orElse(null);
+ .values()
+ .stream()
+ .findFirst()
+ .orElse(null);
if (type.equals(Integer.class)) {
return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 3cc6984b7..8156e2545 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -17,6 +17,13 @@
*/
package org.apache.streampipes.svcdiscovery.consul;
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
import com.orbitz.consul.AgentClient;
import com.orbitz.consul.Consul;
import com.orbitz.consul.model.agent.ImmutableRegCheck;
@@ -24,8 +31,6 @@ import com.orbitz.consul.model.agent.ImmutableRegistration;
import com.orbitz.consul.model.agent.Registration;
import com.orbitz.consul.model.health.HealthCheck;
import com.orbitz.consul.model.health.Service;
-import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,14 +63,14 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
public List<String> getActivePipelineElementEndpoints() {
LOG.info("Discovering active pipeline element service endpoints");
return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
- Collections.singletonList(DefaultSpServiceTags.PE.asString()));
+ Collections.singletonList(DefaultSpServiceTags.PE.asString()));
}
@Override
public List<String> getActiveConnectWorkerEndpoints() {
LOG.info("Discovering active StreamPipes Connect worker service endpoints");
return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
- Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
+ Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
}
@Override
@@ -100,11 +105,13 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
}
private boolean hasExtensionsTag(List<String> tags) {
- return tags.stream().anyMatch(tag -> tag.equals(DefaultSpServiceTags.PE.asString()) || tag.equals(DefaultSpServiceTags.CONNECT_WORKER.asString()));
+ return tags.stream().anyMatch(tag -> tag.equals(DefaultSpServiceTags.PE.asString())
+ || tag.equals(DefaultSpServiceTags.CONNECT_WORKER.asString()));
}
private String extractServiceGroup(List<String> tags) {
- String groupTag = tags.stream().filter(tag -> tag.startsWith(SpServiceTagPrefix.SP_GROUP.asString())).findFirst().orElse("unknown service group");
+ String groupTag = tags.stream().filter(tag -> tag.startsWith(SpServiceTagPrefix.SP_GROUP.asString())).findFirst()
+ .orElse("unknown service group");
return groupTag.replaceAll(SpServiceTagPrefix.SP_GROUP.asString() + ":", "");
}
@@ -117,18 +124,18 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
private Registration createRegistrationBody(SpServiceRegistrationRequest req) {
return ImmutableRegistration.builder()
- .id(req.getSvcId())
- .name(req.getSvcGroup())
- .port(req.getPort())
- .address(HTTP_PROTOCOL + req.getHost())
- .check(ImmutableRegCheck.builder()
- .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
- .interval(HEALTH_CHECK_INTERVAL)
- .deregisterCriticalServiceAfter("120s")
- .status("passing")
- .build())
- .tags(asString(req.getTags()))
- .enableTagOverride(true)
- .build();
+ .id(req.getSvcId())
+ .name(req.getSvcGroup())
+ .port(req.getPort())
+ .address(HTTP_PROTOCOL + req.getHost())
+ .check(ImmutableRegCheck.builder()
+ .http(HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath())
+ .interval(HEALTH_CHECK_INTERVAL)
+ .deregisterCriticalServiceAfter("120s")
+ .status("passing")
+ .build())
+ .tags(asString(req.getTags()))
+ .enableTagOverride(true)
+ .build();
}
}
diff --git a/streampipes-service-discovery/pom.xml b/streampipes-service-discovery/pom.xml
index b4a26be1e..41a47aac8 100644
--- a/streampipes-service-discovery/pom.xml
+++ b/streampipes-service-discovery/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -39,6 +40,25 @@
<version>0.71.0-SNAPSHOT</version>
</dependency>
</dependencies>
-
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
index 2a292c87c..8f836b077 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscovery.java
@@ -29,7 +29,7 @@ public class SpServiceDiscovery {
public static ISpServiceDiscovery getServiceDiscovery() {
return new SpConsulServiceDiscovery();
}
-
+
public static ISpKvManagement getKeyValueStore() {
return new SpConsulKvManagement();
}
diff --git a/streampipes-service-extensions-base/pom.xml b/streampipes-service-extensions-base/pom.xml
index 5570442fe..0095048bc 100644
--- a/streampipes-service-extensions-base/pom.xml
+++ b/streampipes-service-extensions-base/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -50,4 +51,26 @@
<maven.compiler.target>8</maven.compiler.target>
</properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
index 19b02f3b8..0aedabe10 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -18,78 +18,83 @@
package org.apache.streampipes.service.extensions.base;
-import org.apache.streampipes.service.base.BaseNetworkingConfig;
-import org.apache.streampipes.service.base.StreamPipesServiceBase;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.service.base.BaseNetworkingConfig;
+import org.apache.streampipes.service.base.StreamPipesServiceBase;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
+
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServiceBase {
- private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
-
- public void init() {
- SpServiceDefinition serviceDef = provideServiceDefinition();
- init(serviceDef);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
+
+ public void init() {
+ SpServiceDefinition serviceDef = provideServiceDefinition();
+ init(serviceDef);
+ }
+
+ public void init(SpServiceDefinition serviceDef) {
+ try {
+ BaseNetworkingConfig networkingConfig = BaseNetworkingConfig.defaultResolution(serviceDef.getDefaultPort());
+ String serviceId = serviceDef.getServiceGroup() + "-" + AUTO_GENERATED_SERVICE_ID;
+ serviceDef.setServiceId(serviceId);
+ DeclarersSingleton.getInstance().populate(networkingConfig.getHost(), networkingConfig.getPort(), serviceDef);
+
+ startExtensionsService(this.getClass(), serviceDef, networkingConfig);
+ } catch (UnknownHostException e) {
+ LOG.error(
+ "Could not auto-resolve host address - "
+ + "please manually provide the hostname using the SP_HOST environment variable");
}
-
- public void init(SpServiceDefinition serviceDef) {
- try {
- BaseNetworkingConfig networkingConfig = BaseNetworkingConfig.defaultResolution(serviceDef.getDefaultPort());
- String serviceId = serviceDef.getServiceGroup() + "-" + AUTO_GENERATED_SERVICE_ID;
- serviceDef.setServiceId(serviceId);
- DeclarersSingleton.getInstance().populate(networkingConfig.getHost(), networkingConfig.getPort(), serviceDef);
-
- startExtensionsService(this.getClass(), serviceDef, networkingConfig);
- } catch (UnknownHostException e) {
- LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
- }
+ }
+
+ public SpServiceDefinition provideServiceDefinition() {
+ return null;
+ }
+
+ public abstract void afterServiceRegistered(SpServiceDefinition serviceDef);
+
+ public void startExtensionsService(Class<?> serviceClass,
+ SpServiceDefinition serviceDef,
+ BaseNetworkingConfig networkingConfig) throws UnknownHostException {
+ this.startStreamPipesService(
+ serviceClass,
+ DefaultSpServiceGroups.EXT,
+ serviceId(),
+ networkingConfig
+ );
+ this.afterServiceRegistered(serviceDef);
+ }
+
+ @Override
+ protected List<SpServiceTag> getServiceTags() {
+ List<SpServiceTag> tags = new ArrayList<>();
+ if (DeclarersSingleton.getInstance().getServiceDefinition() != null) {
+ tags.add(SpServiceTag.create(SpServiceTagPrefix.SP_GROUP,
+ DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup()));
}
+ tags.addAll(getExtensionsServiceTags());
+ return tags;
+ }
- public SpServiceDefinition provideServiceDefinition() {
- return null;
- }
-
- public abstract void afterServiceRegistered(SpServiceDefinition serviceDef);
-
- public void startExtensionsService(Class<?> serviceClass,
- SpServiceDefinition serviceDef,
- BaseNetworkingConfig networkingConfig) throws UnknownHostException {
- this.startStreamPipesService(
- serviceClass,
- DefaultSpServiceGroups.EXT,
- serviceId(),
- networkingConfig
- );
- this.afterServiceRegistered(serviceDef);
- }
+ protected abstract List<SpServiceTag> getExtensionsServiceTags();
- @Override
- protected List<SpServiceTag> getServiceTags() {
- List<SpServiceTag> tags = new ArrayList<>();
- if (DeclarersSingleton.getInstance().getServiceDefinition() != null) {
- tags.add(SpServiceTag.create(SpServiceTagPrefix.SP_GROUP, DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup()));
- }
- tags.addAll(getExtensionsServiceTags());
- return tags;
- }
+ @PreDestroy
+ public abstract void onExit();
- protected abstract List<SpServiceTag> getExtensionsServiceTags();
-
- @PreDestroy
- public abstract void onExit();
-
- public String serviceId() {
- return DeclarersSingleton.getInstance().getServiceId();
- }
+ public String serviceId() {
+ return DeclarersSingleton.getInstance().getServiceId();
+ }
}
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/WebSecurityConfig.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/WebSecurityConfig.java
index ada8748ea..6eaa7cf6a 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/WebSecurityConfig.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/WebSecurityConfig.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.service.base.security.UnauthorizedRequestEntryPoint;
import org.apache.streampipes.service.extensions.base.security.TokenAuthenticationFilter;
import org.apache.streampipes.service.extensions.base.security.UnauthenticatedInterfaces;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -56,23 +57,24 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
if (isAnonymousAccess()) {
http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
- .and()
- .csrf().disable()
- .formLogin().disable()
- .httpBasic().disable().authorizeRequests().antMatchers("/**").permitAll();
+ .and()
+ .csrf().disable()
+ .formLogin().disable()
+ .httpBasic().disable().authorizeRequests().antMatchers("/**").permitAll();
} else {
http
- .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
- .and()
- .csrf().disable()
- .formLogin().disable()
- .httpBasic().disable()
- .exceptionHandling()
- .authenticationEntryPoint(new UnauthorizedRequestEntryPoint())
- .and()
- .authorizeRequests()
- .antMatchers(UnauthenticatedInterfaces.get().toArray(new String[0])).permitAll()
- .anyRequest().authenticated().and().addFilterBefore(tokenAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class);
+ .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
+ .and()
+ .csrf().disable()
+ .formLogin().disable()
+ .httpBasic().disable()
+ .exceptionHandling()
+ .authenticationEntryPoint(new UnauthorizedRequestEntryPoint())
+ .and()
+ .authorizeRequests()
+ .antMatchers(UnauthenticatedInterfaces.get().toArray(new String[0])).permitAll()
+ .anyRequest().authenticated().and()
+ .addFilterBefore(tokenAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class);
}
}
@@ -82,8 +84,9 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
LOG.info("Configured service for authenticated access mode");
return false;
} else {
- LOG.warn("No env variable {} provided, which is required for authenticated access. Defaulting to anonymous access.",
- Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
+ LOG.warn(
+ "No env variable {} provided, which is required for authenticated access. Defaulting to anonymous access.",
+ Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
return true;
}
} else {
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/client/StreamPipesClientRuntimeConnectionResolver.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/client/StreamPipesClientRuntimeConnectionResolver.java
index d90b8f963..06f5ea45b 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/client/StreamPipesClientRuntimeConnectionResolver.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/client/StreamPipesClientRuntimeConnectionResolver.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.commons.networking.Networking;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,11 +84,11 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect
private List<String> findClientServices() {
return SpServiceDiscovery
- .getServiceDiscovery()
- .getServiceEndpoints(
- DefaultSpServiceGroups.CORE,
- true,
- Collections.singletonList(DefaultSpServiceTags.STREAMPIPES_CLIENT.asString())
- );
+ .getServiceDiscovery()
+ .getServiceEndpoints(
+ DefaultSpServiceGroups.CORE,
+ true,
+ Collections.singletonList(DefaultSpServiceTags.STREAMPIPES_CLIENT.asString())
+ );
}
}
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/TokenAuthenticationFilter.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/TokenAuthenticationFilter.java
index 3b50f1c87..a8bb0630c 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/TokenAuthenticationFilter.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/TokenAuthenticationFilter.java
@@ -19,13 +19,14 @@
package org.apache.streampipes.service.extensions.base.security;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import io.jsonwebtoken.Claims;
import org.apache.streampipes.commons.constants.HttpConstants;
import org.apache.streampipes.model.UserInfo;
import org.apache.streampipes.security.jwt.JwtTokenUtils;
import org.apache.streampipes.security.jwt.JwtTokenValidator;
import org.apache.streampipes.security.jwt.PublicKeyResolver;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.jsonwebtoken.Claims;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.web.authentication.WebAuthenticationDetailsSource;
@@ -36,6 +37,7 @@ import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/UnauthenticatedInterfaces.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/UnauthenticatedInterfaces.java
index 86e443ae4..f6c8bb083 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/UnauthenticatedInterfaces.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/security/UnauthenticatedInterfaces.java
@@ -27,12 +27,12 @@ public class UnauthenticatedInterfaces {
public static Collection<String> get() {
return Arrays.asList(
- "/svchealth/*",
- "/",
- "/sec/**",
- "/sepa/**",
- "/stream/**",
- "/api/v1/worker/**"
+ "/svchealth/*",
+ "/",
+ "/sec/**",
+ "/sepa/**",
+ "/stream/**",
+ "/api/v1/worker/**"
);
}
}