You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/02/04 12:31:58 UTC
[streampipes] branch 1223-harmonize-handling-of-environment-variables updated: Provide environment variables via Environment provider (#1223)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1223-harmonize-handling-of-environment-variables
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1223-harmonize-handling-of-environment-variables by this push:
new 917f019ce Provide environment variables via Environment provider (#1223)
917f019ce is described below
commit 917f019ce1dba415e5077c2e87d5c10a04154503
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sat Feb 4 13:31:40 2023 +0100
Provide environment variables via Environment provider (#1223)
---
.../streampipes/commons/constants/CustomEnvs.java | 20 ++++-
.../commons/constants/DefaultEnvValues.java | 4 +-
.../apache/streampipes/commons/constants/Envs.java | 18 ++---
.../commons/environment/DefaultEnvironment.java | 70 ++++++++++++++++++
.../commons/environment/Environment.java | 33 +++++++++
.../commons/environment/Environments.java | 16 +++-
.../environment/variable/EnvironmentVariable.java | 8 ++
.../streampipes/config/backend/BackendConfig.java | 85 ++--------------------
...StreamPipesClientRuntimeConnectionResolver.java | 21 ++----
.../management/connect/adapter/Adapter.java | 16 ----
.../elements/SendToBrokerAdapterSink.java | 15 ++--
.../elements/SendToJmsAdapterSink.java | 4 +-
.../elements/SendToKafkaAdapterSink.java | 6 +-
.../elements/SendToMqttAdapterSink.java | 4 +-
.../elements/SendToNatsAdapterSink.java | 4 +-
.../simulator/simulator/VehicleDataSimulator.java | 13 +++-
.../simulator/utils/WatertankDataSimulator.java | 13 +++-
.../messaging/kafka/SpKafkaProducer.java | 6 +-
.../runtime/PipelineElementRuntimeInfoFetcher.java | 13 +++-
.../manager/setup/AutoInstallation.java | 56 ++++++--------
.../resource/management/UserResourceManager.java | 11 ++-
.../pe/InvocablePipelineElementResource.java | 4 +-
.../streampipes/security/jwt/KeyGenerator.java | 5 +-
.../service/core/StreamPipesEnvChecker.java | 40 ++++++----
.../extensions/security/WebSecurityConfig.java | 12 ++-
.../encryption/SecretEncryptionManager.java | 7 +-
.../user/management/jwt/JwtTokenProvider.java | 7 +-
.../standalone/function/StreamPipesFunction.java | 10 ++-
28 files changed, 306 insertions(+), 215 deletions(-)
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
index 514c2e5b5..c4cdb950b 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
@@ -17,14 +17,16 @@
*/
package org.apache.streampipes.commons.constants;
+import java.util.Map;
+
public class CustomEnvs {
public static boolean exists(String envVariable) {
- return System.getenv().containsKey(envVariable);
+ return AllEnvs.INSTANCE.getEnvs().containsKey(envVariable);
}
public static String getEnv(String envVariable) {
- return System.getenv(envVariable);
+ return AllEnvs.INSTANCE.getEnvs().get(envVariable);
}
public static Integer getEnvAsInt(String envVariable) {
@@ -34,4 +36,18 @@ public class CustomEnvs {
public static Boolean getEnvAsBoolean(String envVariable) {
return Boolean.parseBoolean(getEnv(envVariable));
}
+
+ private enum AllEnvs {
+ INSTANCE;
+
+ private final Map<String, String> envs;
+
+ AllEnvs() {
+ this.envs = System.getenv();
+ }
+
+ public Map<String, String> getEnvs() {
+ return envs;
+ }
+ }
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
index 590abb902..e80091589 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
@@ -29,7 +29,9 @@ public class DefaultEnvValues {
public static final int CONSUL_PORT_DEFAULT = 8500;
public static final int MAX_WAIT_TIME_AT_SHUTDOWN_DEFAULT = 10000;
- public static final boolean INSTALL_PIPELINE_ELEMENTS = true;
+ public static final String INSTALL_PIPELINE_ELEMENTS = "true";
public static final String DEFAULT_ENCRYPTION_PASSCODE = "eGgemyGBoILAu3xckoIp";
+
+ public static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000";
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 067df3dea..e8f795fcf 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -27,20 +27,20 @@ public enum Envs {
SP_CONSUL_HOST("SP_CONSUL_HOST", "consul", "localhost"),
SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"),
- SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null),
+ SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", DefaultEnvValues.SP_KAFKA_RETENTION_MS_DEFAULT),
SP_JWT_SECRET("JWT_SECRET", null),
SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null),
SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null),
SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null),
- SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null),
- SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null),
- SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null),
- SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null),
- SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null),
+ SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT),
+ SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT),
+ SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT),
+ SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT),
+ SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS),
SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null),
- SP_CLIENT_USER("SP_CLIENT_USER", null),
- SP_CLIENT_SECRET("SP_CLIENT_SECRET", null),
- SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null),
+ SP_CLIENT_USER("SP_CLIENT_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT),
+ SP_CLIENT_SECRET("SP_CLIENT_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT),
+ SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE),
SP_DEBUG("SP_DEBUG", "false"),
SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null),
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 8ccf01948..2d11dda01 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -94,6 +94,76 @@ public class DefaultEnvironment implements Environment {
return new StringEnvironmentVariable(Envs.SP_COUCHDB_PASSWORD);
}
+ @Override
+ public StringEnvironmentVariable getClientUser() {
+ return new StringEnvironmentVariable(Envs.SP_CLIENT_USER);
+ }
+
+ @Override
+ public StringEnvironmentVariable getClientSecret() {
+ return new StringEnvironmentVariable(Envs.SP_CLIENT_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtSecret() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtPublicKeyLoc() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_PUBLIC_KEY_LOC);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtPrivateKeyLoc() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_PRIVATE_KEY_LOC);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtSigningMode() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_SIGNING_MODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getExtensionsAuthMode() {
+ return new StringEnvironmentVariable(Envs.SP_EXT_AUTH_MODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getEncryptionPasscode() {
+ return new StringEnvironmentVariable(Envs.SP_ENCRYPTION_PASSCODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKafkaRetentionTimeMs() {
+ return new StringEnvironmentVariable(Envs.SP_KAFKA_RETENTION_MS);
+ }
+
+ @Override
+ public BooleanEnvironmentVariable getSetupInstallPipelineElements() {
+ return new BooleanEnvironmentVariable(Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialServiceUserSecret() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialServiceUser() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialAdminEmail() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_EMAIL);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialAdminPassword() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_PASSWORD);
+ }
+
@Override
public StringEnvironmentVariable getConsulLocation() {
return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 63be042c3..0da05b0d4 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -60,4 +60,37 @@ public interface Environment {
StringEnvironmentVariable getCouchDbPassword();
+
+ // JWT & Authentication
+
+ StringEnvironmentVariable getClientUser();
+
+ StringEnvironmentVariable getClientSecret();
+
+ StringEnvironmentVariable getJwtSecret();
+
+ StringEnvironmentVariable getJwtPublicKeyLoc();
+
+ StringEnvironmentVariable getJwtPrivateKeyLoc();
+
+ StringEnvironmentVariable getJwtSigningMode();
+
+ StringEnvironmentVariable getExtensionsAuthMode();
+
+ StringEnvironmentVariable getEncryptionPasscode();
+
+ // Messaging
+ StringEnvironmentVariable getKafkaRetentionTimeMs();
+
+
+ // Setup
+ BooleanEnvironmentVariable getSetupInstallPipelineElements();
+
+ StringEnvironmentVariable getInitialServiceUserSecret();
+
+ StringEnvironmentVariable getInitialServiceUser();
+
+ StringEnvironmentVariable getInitialAdminEmail();
+
+ StringEnvironmentVariable getInitialAdminPassword();
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
index 631e72dcc..c25d98bbb 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
@@ -21,6 +21,20 @@ package org.apache.streampipes.commons.environment;
public class Environments {
public static Environment getEnvironment() {
- return new DefaultEnvironment();
+ return Env.DEFAULT.getEnvironment();
+ }
+
+ private enum Env {
+ DEFAULT(new DefaultEnvironment());
+
+ private final Environment environment;
+
+ Env(Environment env) {
+ this.environment = env;
+ }
+
+ public Environment getEnvironment() {
+ return environment;
+ }
}
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
index e02031df4..b80e9bdc6 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
@@ -59,6 +59,14 @@ public abstract class EnvironmentVariable<T> {
return resolver.resolve();
}
+ public T getDefault() {
+ return this.defaultValue;
+ }
+
+ public String getEnvVariableName() {
+ return this.envVariableName;
+ }
+
private boolean isDevModeActive() {
return CustomEnvs.getEnvAsBoolean(Envs.SP_DEBUG.getEnvVariableName());
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 43e602362..ce13e42d5 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -19,7 +19,8 @@
package org.apache.streampipes.config.backend;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.random.TokenGenerator;
import org.apache.streampipes.config.backend.model.EmailConfig;
import org.apache.streampipes.config.backend.model.GeneralConfig;
@@ -59,9 +60,6 @@ public enum BackendConfig {
config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
config.register(BackendConfigKeys.ZOOKEEPER_HOST, "zookeeper", "Hostname for backend service for zookeeper");
config.register(BackendConfigKeys.ZOOKEEPER_PORT, 2181, "Port for backend service for zookeeper");
- config.register(BackendConfigKeys.ELASTICSEARCH_HOST, "elasticsearch", "Hostname for elasticsearch service");
- config.register(BackendConfigKeys.ELASTICSEARCH_PORT, 9200, "Port for elasticsearch service");
- config.register(BackendConfigKeys.ELASTICSEARCH_PROTOCOL, "http", "Protocol the elasticsearch service");
config.register(BackendConfigKeys.IS_CONFIGURED, false,
"Boolean that indicates whether streampipes is " + "already configured or not");
config.register(BackendConfigKeys.IS_SETUP_RUNNING, false,
@@ -108,22 +106,6 @@ public enum BackendConfig {
false, false, possibleCharacters, new SecureRandom());
}
- public String getBackendHost() {
- return config.getString(BackendConfigKeys.BACKEND_HOST);
- }
-
- public int getBackendPort() {
- return config.getInteger(BackendConfigKeys.BACKEND_PORT);
- }
-
- public String getBackendUrl() {
- return "http://" + getBackendHost() + ":" + getBackendPort();
- }
-
- public String getBackendApiUrl() {
- return getBackendUrl() + "/streampipes-backend/";
- }
-
public String getJmsHost() {
return config.getString(BackendConfigKeys.JMS_HOST);
}
@@ -156,10 +138,6 @@ public enum BackendConfig {
return config.getInteger(BackendConfigKeys.KAFKA_PORT);
}
- public String getKafkaUrl() {
- return getKafkaHost() + ":" + getKafkaPort();
- }
-
public String getZookeeperHost() {
return config.getString(BackendConfigKeys.ZOOKEEPER_HOST);
}
@@ -189,30 +167,6 @@ public enum BackendConfig {
config.setBoolean(BackendConfigKeys.IS_CONFIGURED, b);
}
- public String getElasticsearchHost() {
- return config.getString(BackendConfigKeys.ELASTICSEARCH_HOST);
- }
-
- public int getElasticsearchPort() {
- return config.getInteger(BackendConfigKeys.ELASTICSEARCH_PORT);
- }
-
- public String getElasticsearchProtocol() {
- return config.getString(BackendConfigKeys.ELASTICSEARCH_PROTOCOL);
- }
-
- public String getKafkaRestHost() {
- return config.getString(BackendConfigKeys.KAFKA_REST_HOST);
- }
-
- public Integer getKafkaRestPort() {
- return config.getInteger(BackendConfigKeys.KAFKA_REST_PORT);
- }
-
- public String getKafkaRestUrl() {
- return "http://" + getKafkaRestHost() + ":" + getKafkaRestPort();
- }
-
public String getAssetDir() {
return config.getString(BackendConfigKeys.ASSETS_DIR);
}
@@ -221,26 +175,6 @@ public enum BackendConfig {
return config.getString(BackendConfigKeys.FILES_DIR);
}
- public String getDatalakeHost() {
- return config.getString(BackendConfigKeys.DATA_LAKE_HOST);
- }
-
- public int getDatalakePort() {
- return config.getInteger(BackendConfigKeys.DATA_LAKE_PORT);
- }
-
- public String getDataLakeUrl() {
- return getDatalakeHost() + ":" + getDatalakePort();
- }
-
- public String getInfluxHost() {
- return config.getString(BackendConfigKeys.INFLUX_HOST);
- }
-
- public int getInfluxPort() {
- return config.getInteger(BackendConfigKeys.INFLUX_PORT);
- }
-
public String getInfluxDatabaseName() {
return config.getString(BackendConfigKeys.INFLUX_DATA_BASE);
}
@@ -270,24 +204,21 @@ public enum BackendConfig {
config.setObject(BackendConfigKeys.LOCAL_AUTH_CONFIG, authConfig);
}
- public boolean isSetupRunning() {
- return config.getBoolean(BackendConfigKeys.IS_SETUP_RUNNING);
- }
-
public void updateSetupStatus(boolean status) {
config.setBoolean(BackendConfigKeys.IS_SETUP_RUNNING, status);
}
private String getJwtSecret() {
- if (Envs.SP_JWT_SECRET.exists()) {
- return Envs.SP_JWT_SECRET.getValue();
- } else {
- return makeDefaultJwtSecret();
- }
+ var env = getEnvironment();
+ return env.getJwtSecret().getValueOrResolve(this::makeDefaultJwtSecret);
}
private String makeDefaultJwtSecret() {
return TokenGenerator.generateNewToken();
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
index 8bb4d28ae..583bb5ab3 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
@@ -20,8 +20,8 @@ package org.apache.streampipes.extensions.management.client;
import org.apache.streampipes.client.credentials.CredentialsProvider;
import org.apache.streampipes.client.credentials.StreamPipesTokenCredentials;
import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.commons.networking.Networking;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
@@ -38,9 +38,10 @@ import java.util.List;
public class StreamPipesClientRuntimeConnectionResolver implements ClientConnectionUrlResolver {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesClientRuntimeConnectionResolver.class);
+ private Environment env;
public StreamPipesClientRuntimeConnectionResolver() {
-
+ this.env = Environments.getEnvironment();
}
@Override
@@ -52,7 +53,7 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect
public String getBaseUrl() throws SpRuntimeException {
List<String> baseUrls = findClientServices();
if (baseUrls.size() > 0) {
- if (Envs.SP_DEBUG.exists()) {
+ if (env.getSpDebug().getValueOrDefault()) {
try {
return "http://" + Networking.getHostname() + ":" + 8030;
} catch (UnknownHostException e) {
@@ -67,19 +68,11 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect
}
private String getClientApiUser() {
- if (Envs.SP_CLIENT_USER.exists()) {
- return Envs.SP_CLIENT_USER.getValue();
- } else {
- return DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT;
- }
+ return env.getClientUser().getValueOrDefault();
}
private String getClientApiSecret() {
- if (Envs.SP_CLIENT_SECRET.exists()) {
- return Envs.SP_CLIENT_SECRET.getValue();
- } else {
- return DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT;
- }
+ return env.getClientSecret().getValueOrDefault();
}
private List<String> findClientServices() {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
index 4d87918d6..1f797ab93 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
@@ -48,33 +48,17 @@ public abstract class Adapter<T extends AdapterDescription> implements IAdapter<
@Override
public void changeEventGrounding(TransportProtocol transportProtocol) {
-
if (transportProtocol instanceof JmsTransportProtocol) {
SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- //((JmsTransportProtocol) transportProtocol).setPort(61616);
- }
sink.changeTransportProtocol((JmsTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof KafkaTransportProtocol) {
SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
- }
sink.changeTransportProtocol((KafkaTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof MqttTransportProtocol) {
SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- //((MqttTransportProtocol) transportProtocol).setPort(1883);
- }
sink.changeTransportProtocol((MqttTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof NatsTransportProtocol) {
SendToNatsAdapterSink sink = (SendToNatsAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- }
sink.changeTransportProtocol((NatsTransportProtocol) transportProtocol);
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
index 3d2109a4e..7b7c653b8 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
@@ -48,8 +50,8 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
.getEventGrounding()
.getTransportProtocol());
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- modifyProtocolForDebugging();
+ if (getEnvironment().getSpDebug().getValueOrDefault()) {
+ modifyProtocolForDebugging(this.protocol);
}
TransportFormat transportFormat = adapterDescription
@@ -88,12 +90,11 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
producer.publish(event);
}
- protected void modifyProtocolForDebugging() {
-
- }
+ public abstract void modifyProtocolForDebugging(T transportProtocol);
public void changeTransportProtocol(T transportProtocol) {
try {
+ modifyProtocolForDebugging(transportProtocol);
producer.disconnect();
producer.connect(transportProtocol);
} catch (SpRuntimeException e) {
@@ -101,6 +102,10 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
}
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
index a392f6c7f..17f954769 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
@@ -30,7 +30,7 @@ public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportPr
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(JmsTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
index b92dc711a..a619140f5 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
@@ -30,8 +30,8 @@ public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTranspo
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
- this.protocol.setKafkaPort(9094);
+ public void modifyProtocolForDebugging(KafkaTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
+ protocol.setKafkaPort(9094);
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
index 62351bf89..04a481434 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
@@ -30,7 +30,7 @@ public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransport
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(MqttTransportProtocol transportProtocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
index 89b7471a2..932183c18 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
@@ -31,7 +31,7 @@ public class SendToNatsAdapterSink extends SendToBrokerAdapterSink<NatsTransport
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(NatsTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
index 87622c56d..63a036b6a 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.sources.vehicle.simulator.simulator;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -37,6 +38,12 @@ public class VehicleDataSimulator implements Runnable {
private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json";
+ private Environment env;
+
+ public VehicleDataSimulator() {
+ this.env = Environments.getEnvironment();
+ }
+
private void initSimulation() {
try {
ConfigExtractor configExtractor =
@@ -70,12 +77,12 @@ public class VehicleDataSimulator implements Runnable {
}
private String getKafkaHost(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
}
private Integer getKafkaPort(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
}
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
index 810054d0f..35dd6f2e0 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.sources.watertank.simulator.utils;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -37,6 +38,12 @@ public class WatertankDataSimulator implements Runnable {
private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json";
+ private Environment env;
+
+ public WatertankDataSimulator() {
+ this.env = Environments.getEnvironment();
+ }
+
private void initSimulation() {
try {
ConfigExtractor configExtractor =
@@ -61,12 +68,12 @@ public class WatertankDataSimulator implements Runnable {
}
private String getKafkaHost(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
}
private Integer getKafkaPort(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index afd3d44a7..407f2d028 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.messaging.kafka;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
@@ -49,7 +49,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
private static final String COLON = ":";
- private static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000";
private String brokerUrl;
private String topic;
@@ -125,8 +124,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
if (!topicExists(topics)) {
Map<String, String> topicConfig = new HashMap<>();
- String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists()
- ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
+ String retentionTime = Environments.getEnvironment().getKafkaRetentionTimeMs().getValueOrDefault();
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 08866eb31..2762bad07 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,7 +17,8 @@
*/
package org.apache.streampipes.manager.runtime;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
@@ -43,15 +44,17 @@ public enum PipelineElementRuntimeInfoFetcher {
private static final int FETCH_INTERVAL_MS = 300;
private final Map<String, SpDataFormatConverter> converterMap;
+ private Environment env;
PipelineElementRuntimeInfoFetcher() {
this.converterMap = new HashMap<>();
+ this.env = Environments.getEnvironment();
}
public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
var topic = getOutputTopic(spDataStream);
var protocol = spDataStream.getEventGrounding().getTransportProtocol();
- if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) {
+ if (env.getSpDebug().getValueOrDefault()) {
protocol.setBrokerHostname("localhost");
}
if (!converterMap.containsKey(topic)) {
@@ -143,7 +146,7 @@ public enum PipelineElementRuntimeInfoFetcher {
String topic) throws SpRuntimeException {
final String[] result = {null};
// Change kafka config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (getEnvironment().getSpDebug().getValueOrDefault()) {
protocol.setKafkaPort(9094);
}
@@ -161,4 +164,8 @@ public enum PipelineElementRuntimeInfoFetcher {
return result[0];
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
index 54d5808ed..0cad980a5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
@@ -17,9 +17,9 @@
*/
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.commons.constants.CustomEnvs;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.model.client.setup.InitialSettings;
@@ -33,6 +33,12 @@ public class AutoInstallation {
private static final Logger LOG = LoggerFactory.getLogger(AutoInstallation.class);
+ private Environment env;
+
+ public AutoInstallation() {
+ this.env = Environments.getEnvironment();
+ }
+
public void startAutoInstallation() {
InitialSettings settings = collectInitialSettings();
@@ -64,57 +70,37 @@ public class AutoInstallation {
}
private boolean autoInstallPipelineElements() {
- if (Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.exists()) {
- return Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.getValueAsBoolean();
- } else {
- return DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS;
- }
+ return env.getSetupInstallPipelineElements().getValueOrDefault();
}
private String findServiceAccountSecret() {
- return getStringOrDefault(
- Envs.SP_INITIAL_SERVICE_USER_SECRET.getEnvVariableName(),
- DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT
- );
+ return env.getInitialServiceUserSecret().getValueOrDefault();
}
private String findServiceAccountName() {
- return getStringOrDefault(
- Envs.SP_INITIAL_SERVICE_USER.getEnvVariableName(),
- DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT
- );
+ return env.getInitialServiceUser().getValueOrDefault();
}
private String findAdminUser() {
return getStringOrDefault(
- Envs.SP_INITIAL_ADMIN_EMAIL.getEnvVariableName(),
- DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT
+ env.getInitialAdminEmail()
);
}
private String findAdminPassword() {
return getStringOrDefault(
- Envs.SP_INITIAL_ADMIN_PASSWORD.getEnvVariableName(),
- DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT
+ env.getInitialAdminPassword()
);
}
- private String getStringOrDefault(String envVariable, String defaultValue) {
- boolean exists = exists(envVariable);
- if (exists) {
- LOG.info("Using provided environment variable {}", envVariable);
- return getString(envVariable);
+ private String getStringOrDefault(StringEnvironmentVariable variable) {
+ String name = variable.getEnvVariableName();
+ if (variable.exists()) {
+ LOG.info("Using provided environment variable {}", name);
+ return variable.getValue();
} else {
- LOG.info("Environment variable {} not found, using default value {}", envVariable, defaultValue);
- return defaultValue;
+ LOG.info("Environment variable {} not found, using default value {}", name, variable.getDefault());
+ return variable.getDefault();
}
}
-
- private boolean exists(String envVariable) {
- return CustomEnvs.exists(envVariable);
- }
-
- private String getString(String envVariable) {
- return CustomEnvs.getEnv(envVariable);
- }
}
diff --git a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
index 506ac0e78..b42cbc0cd 100644
--- a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
+++ b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
@@ -18,8 +18,8 @@
package org.apache.streampipes.resource.management;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.UserNotFoundException;
import org.apache.streampipes.commons.exceptions.UsernameAlreadyTakenException;
import org.apache.streampipes.mail.MailSender;
@@ -65,8 +65,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
}
public Principal getServiceAdmin() {
+ var env = getEnvironment();
return db.getServiceAccount(
- Envs.SP_INITIAL_SERVICE_USER.getValueOrDefault(DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT)
+ env.getInitialServiceUser().getValueOrDefault()
);
}
@@ -160,5 +161,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
return StorageDispatcher.INSTANCE.getNoSqlStore().getUserActivationTokenStorage();
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
index fae16db4b..37633c631 100644
--- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
+++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.rest.extensions.pe;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.declarer.Declarer;
@@ -197,7 +197,7 @@ public abstract class InvocablePipelineElementResource<K extends InvocableStream
protected abstract K createGroundingDebugInformation(K graph);
private Boolean isDebug() {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean();
+ return Environments.getEnvironment().getSpDebug().getValueOrDefault();
}
private String getServiceGroup() {
diff --git a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
index ceda5a079..31592b984 100644
--- a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
+++ b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.security.jwt;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import io.jsonwebtoken.security.Keys;
import org.slf4j.Logger;
@@ -66,7 +66,8 @@ public class KeyGenerator {
}
public String readKey() throws IOException {
- return Files.readString(Paths.get(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue()), Charset.defaultCharset());
+ var publicKeyLoc = Environments.getEnvironment().getJwtPublicKeyLoc().getValue();
+ return Files.readString(Paths.get(publicKeyLoc), Charset.defaultCharset());
}
public Key makeKeyForRsa(String key) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException {
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
index a2fac136d..64e187c78 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.service.core;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.model.JwtSigningMode;
import org.apache.streampipes.config.backend.model.LocalAuthConfig;
@@ -36,6 +37,12 @@ public class StreamPipesEnvChecker {
BackendConfig coreConfig;
+ private Environment env;
+
+ public StreamPipesEnvChecker() {
+ this.env = Environments.getEnvironment();
+ }
+
public void updateEnvironmentVariables() {
this.coreConfig = BackendConfig.INSTANCE;
@@ -46,37 +53,42 @@ public class StreamPipesEnvChecker {
private void updateJwtSettings() {
LocalAuthConfig localAuthConfig = coreConfig.getLocalAuthConfig();
boolean incompleteConfig = false;
- if (Envs.SP_JWT_SIGNING_MODE.exists()) {
- localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(Envs.SP_JWT_SIGNING_MODE.getValue()));
+ var signingMode = env.getJwtSigningMode();
+ var jwtSecret = env.getJwtSecret();
+ var publicKeyLoc = env.getJwtPublicKeyLoc();
+ var privateKeyLoc = env.getJwtPrivateKeyLoc();
+
+ if (signingMode.exists()) {
+ localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(signingMode.getValue()));
}
- if (Envs.SP_JWT_SECRET.exists()) {
- localAuthConfig.setTokenSecret(Envs.SP_JWT_SECRET.getValue());
+ if (jwtSecret.exists()) {
+ localAuthConfig.setTokenSecret(jwtSecret.getValue());
}
- if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) {
+ if (publicKeyLoc.exists()) {
try {
- localAuthConfig.setPublicKey(readPublicKey(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue()));
+ localAuthConfig.setPublicKey(readPublicKey(publicKeyLoc.getValue()));
} catch (IOException e) {
incompleteConfig = true;
- LOG.warn("Could not read public key at location " + Envs.SP_JWT_PUBLIC_KEY_LOC);
+ LOG.warn("Could not read public key at location " + publicKeyLoc.getValue());
}
}
- if (!Envs.SP_JWT_SIGNING_MODE.exists()) {
+ if (!signingMode.exists()) {
LOG.info(
"No JWT signing mode provided (using default settings), "
+ "consult the docs to learn how to provide JWT settings");
- } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !Envs.SP_JWT_SECRET.exists()) {
+ } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !jwtSecret.exists()) {
LOG.warn(
"JWT signing mode set to HMAC but no secret provided (falling back to auto-generated secret), "
+ "provide a {} variable",
- Envs.SP_JWT_SECRET.getEnvVariableName());
+ jwtSecret.getEnvVariableName());
} else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.RSA
- && ((!Envs.SP_JWT_PUBLIC_KEY_LOC.exists() || !Envs.SP_JWT_PRIVATE_KEY_LOC.exists()) || incompleteConfig)) {
+ && ((!publicKeyLoc.exists() || !privateKeyLoc.exists()) || incompleteConfig)) {
LOG.warn(
"JWT signing mode set to RSA but no public or private key location provided, "
+ "do you provide {} and {} variables?",
- Envs.SP_JWT_PRIVATE_KEY_LOC.getEnvVariableName(),
- Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
+ privateKeyLoc.getEnvVariableName(),
+ publicKeyLoc.getEnvVariableName());
}
if (!incompleteConfig) {
LOG.info("Updating local auth config with signing mode {}", localAuthConfig.getJwtSigningMode().name());
diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
index 56f0b2050..9e88dcf19 100644
--- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
+++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.service.extensions.security;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.service.base.security.UnauthorizedRequestEntryPoint;
import org.slf4j.Logger;
@@ -45,9 +46,11 @@ public class WebSecurityConfig {
private static final Logger LOG = LoggerFactory.getLogger(WebSecurityConfig.class);
private final UserDetailsService userDetailsService;
+ private Environment env;
public WebSecurityConfig() {
this.userDetailsService = username -> null;
+ this.env = Environments.getEnvironment();
}
@Autowired
@@ -91,14 +94,15 @@ public class WebSecurityConfig {
}
private boolean isAnonymousAccess() {
- if (Envs.SP_EXT_AUTH_MODE.exists() && Envs.SP_EXT_AUTH_MODE.getValue().equals("AUTH")) {
- if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) {
+ var extAuthMode = env.getExtensionsAuthMode();
+ if (extAuthMode.exists() && extAuthMode.getValue().equals("AUTH")) {
+ if (env.getJwtPublicKeyLoc().exists()) {
LOG.info("Configured service for authenticated access mode");
return false;
} else {
LOG.warn(
"No env variable {} provided, which is required for authenticated access. Defaulting to anonymous access.",
- Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
+ env.getJwtPublicKeyLoc().getEnvVariableName());
return true;
}
} else {
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
index 81215579a..76b4494c8 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
@@ -17,8 +17,7 @@
*/
package org.apache.streampipes.user.management.encryption;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.jasypt.encryption.StringEncryptor;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
@@ -36,11 +35,11 @@ public class SecretEncryptionManager {
}
private static StringEncryptor getEncryptor() {
+ var env = Environments.getEnvironment();
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
- encryptor.setPassword(Envs.SP_ENCRYPTION_PASSCODE.getValueOrDefault(DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE));
+ encryptor.setPassword(env.getEncryptionPasscode().getValueOrDefault());
encryptor.setIvGenerator(new RandomIvGenerator());
return encryptor;
-
}
}
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
index a17fed07b..4abb09a3c 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.user.management.jwt;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.model.JwtSigningMode;
import org.apache.streampipes.config.backend.model.LocalAuthConfig;
@@ -51,9 +52,11 @@ public class JwtTokenProvider {
public static final String CLAIM_USER = "user";
private static final Logger LOG = LoggerFactory.getLogger(JwtTokenProvider.class);
private BackendConfig config;
+ private Environment env;
public JwtTokenProvider() {
this.config = BackendConfig.INSTANCE;
+ this.env = Environments.getEnvironment();
}
public String createToken(Authentication authentication) {
@@ -117,7 +120,7 @@ public class JwtTokenProvider {
}
private Path getKeyFilePath() {
- return Paths.get(Envs.SP_JWT_PRIVATE_KEY_LOC.getValue());
+ return Paths.get(env.getJwtPrivateKeyLoc().getValue());
}
private LocalAuthConfig authConfig() {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index eb0a49c27..ccc2b9ff5 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.wrapper.standalone.function;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.declarer.IFunctionConfig;
import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer;
@@ -158,8 +159,9 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
private Map<String, SpInputCollector> getInputCollectors(Collection<SpDataStream> streams) throws SpRuntimeException {
Map<String, SpInputCollector> inputCollectors = new HashMap<>();
+ var env = getEnvironment();
for (SpDataStream is : streams) {
- if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) {
+ if (env.getSpDebug().getValueOrDefault()) {
GroundingDebugUtils.modifyGrounding(is.getEventGrounding());
}
inputCollectors.put(is.getElementId(), ProtocolManager.findInputCollector(is.getEventGrounding()
@@ -193,6 +195,10 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
return new SchemaInfo(eventSchema, new ArrayList<>());
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
public abstract IFunctionConfig getFunctionConfig();
public abstract void onServiceStarted(FunctionContext context);