You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/02/16 20:20:43 UTC
[streampipes] branch dev updated: Provide environment variables via Environment provider (#1223) (#1224)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 14724af4d Provide environment variables via Environment provider (#1223) (#1224)
14724af4d is described below
commit 14724af4d639e8ba85b78745ad6bfd3e4045ac77
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Feb 16 21:20:37 2023 +0100
Provide environment variables via Environment provider (#1223) (#1224)
* Provide environment variables via Environment provider (#1223)
* Clean up consul configuration and env variable provider (#1223)
* Fix bug in environment variable provider (#1223)
* Receive database name from environment (#1223)
---
.../streampipes/commons/constants/CustomEnvs.java | 20 +++-
.../commons/constants/DefaultEnvValues.java | 9 +-
.../apache/streampipes/commons/constants/Envs.java | 87 +++++++----------
.../commons/environment/DefaultEnvironment.java | 80 ++++++++++++++++
.../commons/environment/Environment.java | 40 ++++++++
.../commons/environment/Environments.java | 16 +++-
.../environment/variable/EnvironmentVariable.java | 20 ++--
.../streampipes/commons/networking/Networking.java | 21 +++--
.../streampipes/config/backend/BackendConfig.java | 104 ++-------------------
.../config/backend/BackendConfigKeys.java | 11 ---
.../dataexplorer/DataLakeManagementV4.java | 10 +-
.../dataexplorer/query/DataExplorerQuery.java | 10 +-
.../dataexplorer/sdk/DataLakeQueryBuilder.java | 8 +-
.../dataexplorer/v4/query/DataExplorerQueryV4.java | 18 ++--
...StreamPipesClientRuntimeConnectionResolver.java | 21 ++---
.../management/connect/adapter/Adapter.java | 16 ----
.../elements/SendToBrokerAdapterSink.java | 15 ++-
.../elements/SendToJmsAdapterSink.java | 4 +-
.../elements/SendToKafkaAdapterSink.java | 6 +-
.../elements/SendToMqttAdapterSink.java | 4 +-
.../elements/SendToNatsAdapterSink.java | 4 +-
.../simulator/simulator/VehicleDataSimulator.java | 13 ++-
.../simulator/utils/WatertankDataSimulator.java | 13 ++-
.../messaging/kafka/SpKafkaProducer.java | 6 +-
.../runtime/PipelineElementRuntimeInfoFetcher.java | 13 ++-
.../manager/setup/AutoInstallation.java | 56 +++++------
.../resource/management/UserResourceManager.java | 11 ++-
.../pe/InvocablePipelineElementResource.java | 4 +-
.../streampipes/security/jwt/KeyGenerator.java | 5 +-
.../service/core/StreamPipesEnvChecker.java | 40 +++++---
.../svcdiscovery/consul/ConsulProvider.java | 2 +-
.../extensions/security/WebSecurityConfig.java | 12 ++-
.../storage/couchdb/utils/CouchDbConfig.java | 55 -----------
.../streampipes/storage/couchdb/utils/Utils.java | 27 +-----
.../encryption/SecretEncryptionManager.java | 7 +-
.../user/management/jwt/JwtTokenProvider.java | 7 +-
.../standalone/function/StreamPipesFunction.java | 10 +-
37 files changed, 405 insertions(+), 400 deletions(-)
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
index 514c2e5b5..c4cdb950b 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
@@ -17,14 +17,16 @@
*/
package org.apache.streampipes.commons.constants;
+import java.util.Map;
+
public class CustomEnvs {
public static boolean exists(String envVariable) {
- return System.getenv().containsKey(envVariable);
+ return AllEnvs.INSTANCE.getEnvs().containsKey(envVariable);
}
public static String getEnv(String envVariable) {
- return System.getenv(envVariable);
+ return AllEnvs.INSTANCE.getEnvs().get(envVariable);
}
public static Integer getEnvAsInt(String envVariable) {
@@ -34,4 +36,18 @@ public class CustomEnvs {
public static Boolean getEnvAsBoolean(String envVariable) {
return Boolean.parseBoolean(getEnv(envVariable));
}
+
+ private enum AllEnvs {
+ INSTANCE;
+
+ private final Map<String, String> envs;
+
+ AllEnvs() {
+ this.envs = System.getenv();
+ }
+
+ public Map<String, String> getEnvs() {
+ return envs;
+ }
+ }
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
index 590abb902..04eb5b564 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java
@@ -25,11 +25,14 @@ public class DefaultEnvValues {
public static final String INITIAL_CLIENT_SECRET_DEFAULT = "my-apache-streampipes-secret-key-change-me";
public static final String CONSUL_HOST_DEFAULT = "consul";
- public static final String CONSUL_HOST_LOCAL = "localhost";
- public static final int CONSUL_PORT_DEFAULT = 8500;
+ public static final String CONSUL_PORT_DEFAULT = "8500";
public static final int MAX_WAIT_TIME_AT_SHUTDOWN_DEFAULT = 10000;
- public static final boolean INSTALL_PIPELINE_ELEMENTS = true;
+ public static final String INSTALL_PIPELINE_ELEMENTS = "true";
public static final String DEFAULT_ENCRYPTION_PASSCODE = "eGgemyGBoILAu3xckoIp";
+
+ public static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000";
+
+ public static final String LOCALHOST = "localhost";
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 067df3dea..8be54973f 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -19,34 +19,38 @@ package org.apache.streampipes.commons.constants;
public enum Envs {
- SP_HOST("SP_HOST", null),
- SP_PORT("SP_PORT", null),
+ SP_HOST("SP_HOST"),
+ SP_PORT("SP_PORT"),
@Deprecated(since = "0.90.0", forRemoval = true)
- SP_CONSUL_LOCATION("CONSUL_LOCATION", "consul", "localhost"),
-
- SP_CONSUL_HOST("SP_CONSUL_HOST", "consul", "localhost"),
- SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"),
- SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null),
- SP_JWT_SECRET("JWT_SECRET", null),
- SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null),
- SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null),
- SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null),
- SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null),
- SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null),
- SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null),
- SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null),
- SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null),
- SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null),
- SP_CLIENT_USER("SP_CLIENT_USER", null),
- SP_CLIENT_SECRET("SP_CLIENT_SECRET", null),
- SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null),
+ SP_CONSUL_LOCATION("CONSUL_LOCATION",
+ DefaultEnvValues.CONSUL_HOST_DEFAULT,
+ DefaultEnvValues.LOCALHOST),
+
+ SP_CONSUL_HOST("SP_CONSUL_HOST",
+ DefaultEnvValues.CONSUL_HOST_DEFAULT,
+ DefaultEnvValues.LOCALHOST),
+ SP_CONSUL_PORT("SP_CONSUL_PORT", DefaultEnvValues.CONSUL_PORT_DEFAULT),
+ SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", DefaultEnvValues.SP_KAFKA_RETENTION_MS_DEFAULT),
+ SP_JWT_SECRET("JWT_SECRET"),
+ SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"),
+ SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"),
+ SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC"),
+ SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT),
+ SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT),
+ SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT),
+ SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT),
+ SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS),
+ SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE"),
+ SP_CLIENT_USER("SP_CLIENT_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT),
+ SP_CLIENT_SECRET("SP_CLIENT_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT),
+ SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE),
SP_DEBUG("SP_DEBUG", "false"),
- SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null),
+ SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN"),
// CouchDB Storage
SP_COUCHDB_PROTOCOL("SP_COUCHDB_PROTOCOL", "http"),
- SP_COUCHDB_HOST("SP_COUCHDB_HOST", "couchdb", "localhost"),
+ SP_COUCHDB_HOST("SP_COUCHDB_HOST", "couchdb", DefaultEnvValues.LOCALHOST),
SP_COUCHDB_PORT("SP_COUCHDB_PORT", "5984"),
SP_COUCHDB_USER("SP_COUCHDB_USER", "admin"),
SP_COUCHDB_PASSWORD("SP_COUCHDB_PASSWORD", "admin"),
@@ -54,7 +58,7 @@ public enum Envs {
// Time Series Storage
SP_TS_STORAGE_PROTOCOL("SP_TS_STORAGE_PROTOCOL", "http"),
- SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", "localhost"),
+ SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", DefaultEnvValues.LOCALHOST),
SP_TS_STORAGE_PORT("SP_TS_STORAGE_PORT", "8086"),
SP_TS_STORAGE_TOKEN("SP_TS_STORAGE_TOKEN", "sp-admin"),
@@ -64,54 +68,29 @@ public enum Envs {
SP_TS_STORAGE_BUCKET("SP_TS_STORAGE_BUCKET", "sp");
private final String envVariableName;
- private final String defaultValue;
+ private String defaultValue;
- private final String devDefaultValue;
+ private String devDefaultValue;
Envs(String envVariableName, String defaultValue, String devDefaultValue) {
- this.envVariableName = envVariableName;
- this.defaultValue = defaultValue;
+ this(envVariableName, defaultValue);
this.devDefaultValue = devDefaultValue;
}
Envs(String envVariableName, String defaultValue) {
- this.envVariableName = envVariableName;
+ this(envVariableName);
this.defaultValue = defaultValue;
this.devDefaultValue = defaultValue;
}
- public boolean exists() {
- return CustomEnvs.exists(this.envVariableName);
- }
-
- public String getValue() {
- return CustomEnvs.getEnv(this.envVariableName);
- }
-
- public Integer getValueAsInt() {
- return CustomEnvs.getEnvAsInt(this.envVariableName);
- }
-
- public Integer getValueAsIntOrDefault(int defaultValue) {
- return exists() ? getValueAsInt() : defaultValue;
- }
-
- public Boolean getValueAsBoolean() {
- return CustomEnvs.getEnvAsBoolean(this.envVariableName);
- }
-
- public boolean getValueAsBooleanOrDefault(boolean defaultValue) {
- return this.exists() ? this.getValueAsBoolean() : defaultValue;
+ Envs(String envVariableName) {
+ this.envVariableName = envVariableName;
}
public String getEnvVariableName() {
return envVariableName;
}
- public String getValueOrDefault(String defaultValue) {
- return this.exists() ? this.getValue() : defaultValue;
- }
-
public String getDefaultValue() {
return defaultValue;
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 8ccf01948..d58260a90 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -40,6 +40,16 @@ public class DefaultEnvironment implements Environment {
return new BooleanEnvironmentVariable(Envs.SP_DEBUG);
}
+ @Override
+ public StringEnvironmentVariable getServiceHost() {
+ return new StringEnvironmentVariable(Envs.SP_HOST);
+ }
+
+ @Override
+ public IntEnvironmentVariable getServicePort() {
+ return new IntEnvironmentVariable(Envs.SP_PORT);
+ }
+
@Override
public StringEnvironmentVariable getTsStorageProtocol() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE_PROTOCOL);
@@ -94,6 +104,76 @@ public class DefaultEnvironment implements Environment {
return new StringEnvironmentVariable(Envs.SP_COUCHDB_PASSWORD);
}
+ @Override
+ public StringEnvironmentVariable getClientUser() {
+ return new StringEnvironmentVariable(Envs.SP_CLIENT_USER);
+ }
+
+ @Override
+ public StringEnvironmentVariable getClientSecret() {
+ return new StringEnvironmentVariable(Envs.SP_CLIENT_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtSecret() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtPublicKeyLoc() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_PUBLIC_KEY_LOC);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtPrivateKeyLoc() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_PRIVATE_KEY_LOC);
+ }
+
+ @Override
+ public StringEnvironmentVariable getJwtSigningMode() {
+ return new StringEnvironmentVariable(Envs.SP_JWT_SIGNING_MODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getExtensionsAuthMode() {
+ return new StringEnvironmentVariable(Envs.SP_EXT_AUTH_MODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getEncryptionPasscode() {
+ return new StringEnvironmentVariable(Envs.SP_ENCRYPTION_PASSCODE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKafkaRetentionTimeMs() {
+ return new StringEnvironmentVariable(Envs.SP_KAFKA_RETENTION_MS);
+ }
+
+ @Override
+ public BooleanEnvironmentVariable getSetupInstallPipelineElements() {
+ return new BooleanEnvironmentVariable(Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialServiceUserSecret() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER_SECRET);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialServiceUser() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialAdminEmail() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_EMAIL);
+ }
+
+ @Override
+ public StringEnvironmentVariable getInitialAdminPassword() {
+ return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_PASSWORD);
+ }
+
@Override
public StringEnvironmentVariable getConsulLocation() {
return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 63be042c3..2bf591303 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -34,6 +34,13 @@ public interface Environment {
BooleanEnvironmentVariable getSpDebug();
+ // Service base configuration
+
+ StringEnvironmentVariable getServiceHost();
+
+ IntEnvironmentVariable getServicePort();
+
+
// Time series storage env variables
StringEnvironmentVariable getTsStorageProtocol();
@@ -60,4 +67,37 @@ public interface Environment {
StringEnvironmentVariable getCouchDbPassword();
+
+ // JWT & Authentication
+
+ StringEnvironmentVariable getClientUser();
+
+ StringEnvironmentVariable getClientSecret();
+
+ StringEnvironmentVariable getJwtSecret();
+
+ StringEnvironmentVariable getJwtPublicKeyLoc();
+
+ StringEnvironmentVariable getJwtPrivateKeyLoc();
+
+ StringEnvironmentVariable getJwtSigningMode();
+
+ StringEnvironmentVariable getExtensionsAuthMode();
+
+ StringEnvironmentVariable getEncryptionPasscode();
+
+ // Messaging
+ StringEnvironmentVariable getKafkaRetentionTimeMs();
+
+
+ // Setup
+ BooleanEnvironmentVariable getSetupInstallPipelineElements();
+
+ StringEnvironmentVariable getInitialServiceUserSecret();
+
+ StringEnvironmentVariable getInitialServiceUser();
+
+ StringEnvironmentVariable getInitialAdminEmail();
+
+ StringEnvironmentVariable getInitialAdminPassword();
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
index 631e72dcc..c25d98bbb 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java
@@ -21,6 +21,20 @@ package org.apache.streampipes.commons.environment;
public class Environments {
public static Environment getEnvironment() {
- return new DefaultEnvironment();
+ return Env.DEFAULT.getEnvironment();
+ }
+
+ private enum Env {
+ DEFAULT(new DefaultEnvironment());
+
+ private final Environment environment;
+
+ Env(Environment env) {
+ this.environment = env;
+ }
+
+ public Environment getEnvironment() {
+ return environment;
+ }
}
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
index e02031df4..4219c51d3 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java
@@ -23,20 +23,14 @@ import org.apache.streampipes.commons.constants.Envs;
public abstract class EnvironmentVariable<T> {
- private final T defaultValue;
+ private final String unparsedDefaultValue;
private final String envVariableName;
private boolean devModeActive;
- public EnvironmentVariable(String envVariableName,
- T defaultValue) {
- this.envVariableName = envVariableName;
- this.defaultValue = defaultValue;
- }
-
public EnvironmentVariable(Envs envVariable) {
this.envVariableName = envVariable.getEnvVariableName();
this.devModeActive = isDevModeActive();
- this.defaultValue = devModeActive ? parse(envVariable.getDevDefaultValue()) : parse(envVariable.getDefaultValue());
+ this.unparsedDefaultValue = devModeActive ? envVariable.getDevDefaultValue() : envVariable.getDefaultValue();
}
public T getValue() {
@@ -48,7 +42,7 @@ public abstract class EnvironmentVariable<T> {
}
public T getValueOrDefault() {
- return exists() ? getValue() : defaultValue;
+ return exists() ? getValue() : parse(unparsedDefaultValue);
}
public T getValueOrReturn(T defaultValue) {
@@ -59,6 +53,14 @@ public abstract class EnvironmentVariable<T> {
return resolver.resolve();
}
+ public T getDefault() {
+ return parse(unparsedDefaultValue);
+ }
+
+ public String getEnvVariableName() {
+ return this.envVariableName;
+ }
+
private boolean isDevModeActive() {
return CustomEnvs.getEnvAsBoolean(Envs.SP_DEBUG.getEnvVariableName());
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java
index 4034c3a9c..0f38cf9cd 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java
@@ -17,7 +17,8 @@
*/
package org.apache.streampipes.commons.networking;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,10 +36,11 @@ public class Networking {
private static final String DEFAULT_LOCALHOST_IP = "127.0.0.1";
public static String getHostname() throws UnknownHostException {
+ var svcHostname = getEnvironment().getServiceHost();
String selectedAddress;
- if (Envs.SP_HOST.exists()) {
- selectedAddress = Envs.SP_HOST.getValue();
- LOG.info("Using IP from provided environment variable {}: {}", Envs.SP_HOST, selectedAddress);
+ if (svcHostname.exists()) {
+ selectedAddress = svcHostname.getValue();
+ LOG.info("Using IP from provided environment variable {}: {}", svcHostname.getEnvVariableName(), selectedAddress);
} else {
selectedAddress = InetAddress.getLocalHost().getHostAddress();
@@ -79,10 +81,11 @@ public class Networking {
}
public static Integer getPort(Integer defaultPort) {
+ var servicePort = getEnvironment().getServicePort();
Integer selectedPort;
- if (Envs.SP_PORT.exists()) {
- selectedPort = Envs.SP_PORT.getValueAsInt();
- LOG.info("Using port from provided environment variable {}: {}", Envs.SP_PORT, selectedPort);
+ if (servicePort.exists()) {
+ selectedPort = servicePort.getValue();
+ LOG.info("Using port from provided environment variable {}: {}", servicePort.getEnvVariableName(), selectedPort);
} else {
selectedPort = defaultPort;
LOG.info("Using default port: {}", defaultPort);
@@ -90,4 +93,8 @@ public class Networking {
return selectedPort;
}
+
+ private static Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 43e602362..caa2c74d8 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -19,7 +19,8 @@
package org.apache.streampipes.config.backend;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.random.TokenGenerator;
import org.apache.streampipes.config.backend.model.EmailConfig;
import org.apache.streampipes.config.backend.model.GeneralConfig;
@@ -28,10 +29,7 @@ import org.apache.streampipes.model.config.MessagingSettings;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.commons.lang3.RandomStringUtils;
-
import java.io.File;
-import java.security.SecureRandom;
public enum BackendConfig {
INSTANCE;
@@ -59,9 +57,6 @@ public enum BackendConfig {
config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
config.register(BackendConfigKeys.ZOOKEEPER_HOST, "zookeeper", "Hostname for backend service for zookeeper");
config.register(BackendConfigKeys.ZOOKEEPER_PORT, 2181, "Port for backend service for zookeeper");
- config.register(BackendConfigKeys.ELASTICSEARCH_HOST, "elasticsearch", "Hostname for elasticsearch service");
- config.register(BackendConfigKeys.ELASTICSEARCH_PORT, 9200, "Port for elasticsearch service");
- config.register(BackendConfigKeys.ELASTICSEARCH_PROTOCOL, "http", "Protocol the elasticsearch service");
config.register(BackendConfigKeys.IS_CONFIGURED, false,
"Boolean that indicates whether streampipes is " + "already configured or not");
config.register(BackendConfigKeys.IS_SETUP_RUNNING, false,
@@ -70,13 +65,6 @@ public enum BackendConfig {
"The directory where " + "pipeline element assets are stored.");
config.register(BackendConfigKeys.FILES_DIR, makeFileLocation(),
"The directory where " + "pipeline element files are stored.");
- config.register(BackendConfigKeys.DATA_LAKE_HOST, "elasticsearch",
- "The host of the data base used for the data lake");
- config.register(BackendConfigKeys.DATA_LAKE_PORT, 9200, "The port of the data base used for the data lake");
-
- config.register(BackendConfigKeys.INFLUX_HOST, "influxdb", "The host of the influx data base");
- config.register(BackendConfigKeys.INFLUX_PORT, 8086, "The hist of the influx data base");
- config.register(BackendConfigKeys.INFLUX_DATA_BASE, "sp", "The influx data base name");
config.registerObject(BackendConfigKeys.MESSAGING_SETTINGS, DefaultMessagingSettings.make(),
"Default Messaging Settings");
@@ -103,27 +91,6 @@ public enum BackendConfig {
+ File.separator;
}
- private String randomKey() {
- return RandomStringUtils.random(10, 0, possibleCharacters.length - 1,
- false, false, possibleCharacters, new SecureRandom());
- }
-
- public String getBackendHost() {
- return config.getString(BackendConfigKeys.BACKEND_HOST);
- }
-
- public int getBackendPort() {
- return config.getInteger(BackendConfigKeys.BACKEND_PORT);
- }
-
- public String getBackendUrl() {
- return "http://" + getBackendHost() + ":" + getBackendPort();
- }
-
- public String getBackendApiUrl() {
- return getBackendUrl() + "/streampipes-backend/";
- }
-
public String getJmsHost() {
return config.getString(BackendConfigKeys.JMS_HOST);
}
@@ -156,10 +123,6 @@ public enum BackendConfig {
return config.getInteger(BackendConfigKeys.KAFKA_PORT);
}
- public String getKafkaUrl() {
- return getKafkaHost() + ":" + getKafkaPort();
- }
-
public String getZookeeperHost() {
return config.getString(BackendConfigKeys.ZOOKEEPER_HOST);
}
@@ -189,30 +152,6 @@ public enum BackendConfig {
config.setBoolean(BackendConfigKeys.IS_CONFIGURED, b);
}
- public String getElasticsearchHost() {
- return config.getString(BackendConfigKeys.ELASTICSEARCH_HOST);
- }
-
- public int getElasticsearchPort() {
- return config.getInteger(BackendConfigKeys.ELASTICSEARCH_PORT);
- }
-
- public String getElasticsearchProtocol() {
- return config.getString(BackendConfigKeys.ELASTICSEARCH_PROTOCOL);
- }
-
- public String getKafkaRestHost() {
- return config.getString(BackendConfigKeys.KAFKA_REST_HOST);
- }
-
- public Integer getKafkaRestPort() {
- return config.getInteger(BackendConfigKeys.KAFKA_REST_PORT);
- }
-
- public String getKafkaRestUrl() {
- return "http://" + getKafkaRestHost() + ":" + getKafkaRestPort();
- }
-
public String getAssetDir() {
return config.getString(BackendConfigKeys.ASSETS_DIR);
}
@@ -221,30 +160,6 @@ public enum BackendConfig {
return config.getString(BackendConfigKeys.FILES_DIR);
}
- public String getDatalakeHost() {
- return config.getString(BackendConfigKeys.DATA_LAKE_HOST);
- }
-
- public int getDatalakePort() {
- return config.getInteger(BackendConfigKeys.DATA_LAKE_PORT);
- }
-
- public String getDataLakeUrl() {
- return getDatalakeHost() + ":" + getDatalakePort();
- }
-
- public String getInfluxHost() {
- return config.getString(BackendConfigKeys.INFLUX_HOST);
- }
-
- public int getInfluxPort() {
- return config.getInteger(BackendConfigKeys.INFLUX_PORT);
- }
-
- public String getInfluxDatabaseName() {
- return config.getString(BackendConfigKeys.INFLUX_DATA_BASE);
- }
-
public LocalAuthConfig getLocalAuthConfig() {
return config.getObject(BackendConfigKeys.LOCAL_AUTH_CONFIG, LocalAuthConfig.class,
LocalAuthConfig.fromDefaults(getJwtSecret()));
@@ -270,24 +185,21 @@ public enum BackendConfig {
config.setObject(BackendConfigKeys.LOCAL_AUTH_CONFIG, authConfig);
}
- public boolean isSetupRunning() {
- return config.getBoolean(BackendConfigKeys.IS_SETUP_RUNNING);
- }
-
public void updateSetupStatus(boolean status) {
config.setBoolean(BackendConfigKeys.IS_SETUP_RUNNING, status);
}
private String getJwtSecret() {
- if (Envs.SP_JWT_SECRET.exists()) {
- return Envs.SP_JWT_SECRET.getValue();
- } else {
- return makeDefaultJwtSecret();
- }
+ var env = getEnvironment();
+ return env.getJwtSecret().getValueOrResolve(this::makeDefaultJwtSecret);
}
private String makeDefaultJwtSecret() {
return TokenGenerator.generateNewToken();
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index 3d1950b75..9970f241f 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -31,21 +31,10 @@ public class BackendConfigKeys {
public static final String KAFKA_PORT = "SP_KAFKA_PORT";
public static final String ZOOKEEPER_HOST = "SP_ZOOKEEPER_HOST";
public static final String ZOOKEEPER_PORT = "SP_ZOOKEEPER_PORT";
- public static final String ELASTICSEARCH_HOST = "SP_ELASTICSEARCH_HOST";
- public static final String ELASTICSEARCH_PORT = "SP_ELASTICSEARCH_PORT";
- public static final String ELASTICSEARCH_PROTOCOL = "SP_ELASTICSEARCH_PROTOCOL";
public static final String IS_CONFIGURED = "SP_IS_CONFIGURED";
public static final String IS_SETUP_RUNNING = "SP_IS_SETUP_RUNNING";
- public static final String KAFKA_REST_HOST = "SP_KAFKA_REST_HOST";
- public static final String KAFKA_REST_PORT = "SP_KAFKA_REST_PORT";
public static final String ASSETS_DIR = "SP_ASSETS_DIR";
public static final String FILES_DIR = "SP_FILES_DIR";
- public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
- public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-
- public static final String INFLUX_PORT = "SP_INFLUX_PORT";
- public static final String INFLUX_HOST = "SP_INFLUX_HOST";
- public static final String INFLUX_DATA_BASE = "SP_INFLUX_DATA_BASE";
public static final String MESSAGING_SETTINGS = "SP_MESSAGING_SETTINGS";
public static final String LOCAL_AUTH_CONFIG = "SP_LOCAL_AUTH_CONFIG";
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index cb73aaddf..6cc0d5620 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
@@ -184,12 +185,13 @@ public class DataLakeManagementV4 {
public Map<String, Object> getTagValues(String measurementId,
String fields) {
InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
+ String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
Map<String, Object> tags = new HashMap<>();
if (fields != null && !("".equals(fields))) {
List<String> fieldList = Arrays.asList(fields.split(","));
fieldList.forEach(f -> {
String q =
- "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId
+ "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
+ "\" WITH KEY = \"" + f + "\"";
Query query = new Query(q);
QueryResult queryResult = influxDB.query(query);
@@ -287,4 +289,8 @@ public class DataLakeManagementV4 {
private IDataLakeStorage getDataLakeStorage() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
}
+
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
index 3bd5140e0..33cd9b997 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
@@ -17,7 +17,8 @@
*/
package org.apache.streampipes.dataexplorer.query;
-import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -33,8 +34,9 @@ public abstract class DataExplorerQuery<T> {
public T executeQuery() throws RuntimeException {
InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
+ var databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
DataExplorerQueryBuilder queryBuilder =
- DataExplorerQueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ DataExplorerQueryBuilder.create(databaseName);
getQuery(queryBuilder);
Query query = queryBuilder.toQuery();
org.influxdb.dto.QueryResult result;
@@ -82,6 +84,10 @@ public abstract class DataExplorerQuery<T> {
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
protected abstract void getQuery(DataExplorerQueryBuilder queryBuilder);
protected abstract T postQuery(org.influxdb.dto.QueryResult result) throws RuntimeException;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
index 21008a315..71c091348 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.dataexplorer.sdk;
-import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
import org.influxdb.dto.Query;
@@ -48,11 +49,14 @@ public class DataLakeQueryBuilder {
private int limit = Integer.MIN_VALUE;
private int offset = Integer.MIN_VALUE;
+ private Environment env;
+
private DataLakeQueryBuilder(String measurementId) {
this.measurementId = measurementId;
this.selectionQuery = select();
this.whereClauses = new ArrayList<>();
this.groupByClauses = new ArrayList<>();
+ this.env = Environments.getEnvironment();
}
public static DataLakeQueryBuilder create(String measurementId) {
@@ -195,7 +199,7 @@ public class DataLakeQueryBuilder {
public Query build() {
var selectQuery =
- this.selectionQuery.from(BackendConfig.INSTANCE.getInfluxDatabaseName(), "\"" + measurementId + "\"");
+ this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), "\"" + measurementId + "\"");
this.whereClauses.forEach(selectQuery::where);
if (this.groupByClauses.size() > 0) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
index 5bb56b375..8cf455d8e 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.dataexplorer.v4.query;
-import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
import org.apache.streampipes.dataexplorer.v4.params.FillParams;
@@ -66,9 +67,7 @@ public class DataExplorerQueryV4 {
private boolean appendId = false;
private String forId;
- public DataExplorerQueryV4() {
-
- }
+ private Environment env;
public DataExplorerQueryV4(Map<String, QueryParamsV4> params,
String forId) {
@@ -79,11 +78,12 @@ public class DataExplorerQueryV4 {
public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
this.params = params;
+ this.env = Environments.getEnvironment();
this.maximumAmountOfEvents = -1;
}
public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) {
- this.params = params;
+ this(params);
this.maximumAmountOfEvents = maximumAmountOfEvents;
}
@@ -92,7 +92,7 @@ public class DataExplorerQueryV4 {
List<QueryElement<?>> queryElements = getQueryElements();
if (this.maximumAmountOfEvents != -1) {
- QueryBuilder countQueryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ QueryBuilder countQueryBuilder = QueryBuilder.create(getDatabaseName());
Query countQuery = countQueryBuilder.build(queryElements, true);
QueryResult countQueryResult = influxDB.query(countQuery);
Double amountOfQueryResults = getAmountOfResults(countQueryResult);
@@ -105,7 +105,7 @@ public class DataExplorerQueryV4 {
}
}
- QueryBuilder queryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ QueryBuilder queryBuilder = QueryBuilder.create(getDatabaseName());
Query query = queryBuilder.build(queryElements, false);
LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
@@ -237,4 +237,8 @@ public class DataExplorerQueryV4 {
return queryElements;
}
+
+ private String getDatabaseName() {
+ return env.getTsStorageBucket().getValueOrDefault();
+ }
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
index 8bb4d28ae..583bb5ab3 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java
@@ -20,8 +20,8 @@ package org.apache.streampipes.extensions.management.client;
import org.apache.streampipes.client.credentials.CredentialsProvider;
import org.apache.streampipes.client.credentials.StreamPipesTokenCredentials;
import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.commons.networking.Networking;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
@@ -38,9 +38,10 @@ import java.util.List;
public class StreamPipesClientRuntimeConnectionResolver implements ClientConnectionUrlResolver {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesClientRuntimeConnectionResolver.class);
+ private Environment env;
public StreamPipesClientRuntimeConnectionResolver() {
-
+ this.env = Environments.getEnvironment();
}
@Override
@@ -52,7 +53,7 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect
public String getBaseUrl() throws SpRuntimeException {
List<String> baseUrls = findClientServices();
if (baseUrls.size() > 0) {
- if (Envs.SP_DEBUG.exists()) {
+ if (env.getSpDebug().getValueOrDefault()) {
try {
return "http://" + Networking.getHostname() + ":" + 8030;
} catch (UnknownHostException e) {
@@ -67,19 +68,11 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect
}
private String getClientApiUser() {
- if (Envs.SP_CLIENT_USER.exists()) {
- return Envs.SP_CLIENT_USER.getValue();
- } else {
- return DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT;
- }
+ return env.getClientUser().getValueOrDefault();
}
private String getClientApiSecret() {
- if (Envs.SP_CLIENT_SECRET.exists()) {
- return Envs.SP_CLIENT_SECRET.getValue();
- } else {
- return DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT;
- }
+ return env.getClientSecret().getValueOrDefault();
}
private List<String> findClientServices() {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
index 4d87918d6..1f797ab93 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java
@@ -48,33 +48,17 @@ public abstract class Adapter<T extends AdapterDescription> implements IAdapter<
@Override
public void changeEventGrounding(TransportProtocol transportProtocol) {
-
if (transportProtocol instanceof JmsTransportProtocol) {
SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- //((JmsTransportProtocol) transportProtocol).setPort(61616);
- }
sink.changeTransportProtocol((JmsTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof KafkaTransportProtocol) {
SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
- }
sink.changeTransportProtocol((KafkaTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof MqttTransportProtocol) {
SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- //((MqttTransportProtocol) transportProtocol).setPort(1883);
- }
sink.changeTransportProtocol((MqttTransportProtocol) transportProtocol);
} else if (transportProtocol instanceof NatsTransportProtocol) {
SendToNatsAdapterSink sink = (SendToNatsAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- }
sink.changeTransportProtocol((NatsTransportProtocol) transportProtocol);
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
index 3d2109a4e..7b7c653b8 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
@@ -48,8 +50,8 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
.getEventGrounding()
.getTransportProtocol());
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- modifyProtocolForDebugging();
+ if (getEnvironment().getSpDebug().getValueOrDefault()) {
+ modifyProtocolForDebugging(this.protocol);
}
TransportFormat transportFormat = adapterDescription
@@ -88,12 +90,11 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
producer.publish(event);
}
- protected void modifyProtocolForDebugging() {
-
- }
+ public abstract void modifyProtocolForDebugging(T transportProtocol);
public void changeTransportProtocol(T transportProtocol) {
try {
+ modifyProtocolForDebugging(transportProtocol);
producer.disconnect();
producer.connect(transportProtocol);
} catch (SpRuntimeException e) {
@@ -101,6 +102,10 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple
}
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
index a392f6c7f..17f954769 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
@@ -30,7 +30,7 @@ public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportPr
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(JmsTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
index b92dc711a..a619140f5 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
@@ -30,8 +30,8 @@ public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTranspo
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
- this.protocol.setKafkaPort(9094);
+ public void modifyProtocolForDebugging(KafkaTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
+ protocol.setKafkaPort(9094);
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
index 62351bf89..04a481434 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
@@ -30,7 +30,7 @@ public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransport
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(MqttTransportProtocol transportProtocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
index 89b7471a2..932183c18 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java
@@ -31,7 +31,7 @@ public class SendToNatsAdapterSink extends SendToBrokerAdapterSink<NatsTransport
}
@Override
- public void modifyProtocolForDebugging() {
- this.protocol.setBrokerHostname("localhost");
+ public void modifyProtocolForDebugging(NatsTransportProtocol protocol) {
+ protocol.setBrokerHostname("localhost");
}
}
diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
index 87622c56d..63a036b6a 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.sources.vehicle.simulator.simulator;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -37,6 +38,12 @@ public class VehicleDataSimulator implements Runnable {
private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json";
+ private Environment env;
+
+ public VehicleDataSimulator() {
+ this.env = Environments.getEnvironment();
+ }
+
private void initSimulation() {
try {
ConfigExtractor configExtractor =
@@ -70,12 +77,12 @@ public class VehicleDataSimulator implements Runnable {
}
private String getKafkaHost(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
}
private Integer getKafkaPort(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
}
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
index 810054d0f..35dd6f2e0 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.sources.watertank.simulator.utils;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -37,6 +38,12 @@ public class WatertankDataSimulator implements Runnable {
private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json";
+ private Environment env;
+
+ public WatertankDataSimulator() {
+ this.env = Environments.getEnvironment();
+ }
+
private void initSimulation() {
try {
ConfigExtractor configExtractor =
@@ -61,12 +68,12 @@ public class WatertankDataSimulator implements Runnable {
}
private String getKafkaHost(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
}
private Integer getKafkaPort(ConfigExtractor configExtractor) {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()
+ return env.getSpDebug().getValueOrDefault()
? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index afd3d44a7..407f2d028 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.messaging.kafka;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
@@ -49,7 +49,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
private static final String COLON = ":";
- private static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000";
private String brokerUrl;
private String topic;
@@ -125,8 +124,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
if (!topicExists(topics)) {
Map<String, String> topicConfig = new HashMap<>();
- String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists()
- ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
+ String retentionTime = Environments.getEnvironment().getKafkaRetentionTimeMs().getValueOrDefault();
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 08866eb31..2762bad07 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,7 +17,8 @@
*/
package org.apache.streampipes.manager.runtime;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
@@ -43,15 +44,17 @@ public enum PipelineElementRuntimeInfoFetcher {
private static final int FETCH_INTERVAL_MS = 300;
private final Map<String, SpDataFormatConverter> converterMap;
+ private Environment env;
PipelineElementRuntimeInfoFetcher() {
this.converterMap = new HashMap<>();
+ this.env = Environments.getEnvironment();
}
public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
var topic = getOutputTopic(spDataStream);
var protocol = spDataStream.getEventGrounding().getTransportProtocol();
- if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) {
+ if (env.getSpDebug().getValueOrDefault()) {
protocol.setBrokerHostname("localhost");
}
if (!converterMap.containsKey(topic)) {
@@ -143,7 +146,7 @@ public enum PipelineElementRuntimeInfoFetcher {
String topic) throws SpRuntimeException {
final String[] result = {null};
// Change kafka config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (getEnvironment().getSpDebug().getValueOrDefault()) {
protocol.setKafkaPort(9094);
}
@@ -161,4 +164,8 @@ public enum PipelineElementRuntimeInfoFetcher {
return result[0];
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
index 54d5808ed..0cad980a5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
@@ -17,9 +17,9 @@
*/
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.commons.constants.CustomEnvs;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.model.client.setup.InitialSettings;
@@ -33,6 +33,12 @@ public class AutoInstallation {
private static final Logger LOG = LoggerFactory.getLogger(AutoInstallation.class);
+ private Environment env;
+
+ public AutoInstallation() {
+ this.env = Environments.getEnvironment();
+ }
+
public void startAutoInstallation() {
InitialSettings settings = collectInitialSettings();
@@ -64,57 +70,37 @@ public class AutoInstallation {
}
private boolean autoInstallPipelineElements() {
- if (Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.exists()) {
- return Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.getValueAsBoolean();
- } else {
- return DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS;
- }
+ return env.getSetupInstallPipelineElements().getValueOrDefault();
}
private String findServiceAccountSecret() {
- return getStringOrDefault(
- Envs.SP_INITIAL_SERVICE_USER_SECRET.getEnvVariableName(),
- DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT
- );
+ return env.getInitialServiceUserSecret().getValueOrDefault();
}
private String findServiceAccountName() {
- return getStringOrDefault(
- Envs.SP_INITIAL_SERVICE_USER.getEnvVariableName(),
- DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT
- );
+ return env.getInitialServiceUser().getValueOrDefault();
}
private String findAdminUser() {
return getStringOrDefault(
- Envs.SP_INITIAL_ADMIN_EMAIL.getEnvVariableName(),
- DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT
+ env.getInitialAdminEmail()
);
}
private String findAdminPassword() {
return getStringOrDefault(
- Envs.SP_INITIAL_ADMIN_PASSWORD.getEnvVariableName(),
- DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT
+ env.getInitialAdminPassword()
);
}
- private String getStringOrDefault(String envVariable, String defaultValue) {
- boolean exists = exists(envVariable);
- if (exists) {
- LOG.info("Using provided environment variable {}", envVariable);
- return getString(envVariable);
+ private String getStringOrDefault(StringEnvironmentVariable variable) {
+ String name = variable.getEnvVariableName();
+ if (variable.exists()) {
+ LOG.info("Using provided environment variable {}", name);
+ return variable.getValue();
} else {
- LOG.info("Environment variable {} not found, using default value {}", envVariable, defaultValue);
- return defaultValue;
+ LOG.info("Environment variable {} not found, using default value {}", name, variable.getDefault());
+ return variable.getDefault();
}
}
-
- private boolean exists(String envVariable) {
- return CustomEnvs.exists(envVariable);
- }
-
- private String getString(String envVariable) {
- return CustomEnvs.getEnv(envVariable);
- }
}
diff --git a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
index 506ac0e78..b42cbc0cd 100644
--- a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
+++ b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
@@ -18,8 +18,8 @@
package org.apache.streampipes.resource.management;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.UserNotFoundException;
import org.apache.streampipes.commons.exceptions.UsernameAlreadyTakenException;
import org.apache.streampipes.mail.MailSender;
@@ -65,8 +65,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
}
public Principal getServiceAdmin() {
+ var env = getEnvironment();
return db.getServiceAccount(
- Envs.SP_INITIAL_SERVICE_USER.getValueOrDefault(DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT)
+ env.getInitialServiceUser().getValueOrDefault()
);
}
@@ -160,5 +161,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
return StorageDispatcher.INSTANCE.getNoSqlStore().getUserActivationTokenStorage();
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
}
diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
index fae16db4b..37633c631 100644
--- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
+++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.rest.extensions.pe;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.declarer.Declarer;
@@ -197,7 +197,7 @@ public abstract class InvocablePipelineElementResource<K extends InvocableStream
protected abstract K createGroundingDebugInformation(K graph);
private Boolean isDebug() {
- return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean();
+ return Environments.getEnvironment().getSpDebug().getValueOrDefault();
}
private String getServiceGroup() {
diff --git a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
index ceda5a079..31592b984 100644
--- a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
+++ b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.security.jwt;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import io.jsonwebtoken.security.Keys;
import org.slf4j.Logger;
@@ -66,7 +66,8 @@ public class KeyGenerator {
}
public String readKey() throws IOException {
- return Files.readString(Paths.get(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue()), Charset.defaultCharset());
+ var publicKeyLoc = Environments.getEnvironment().getJwtPublicKeyLoc().getValue();
+ return Files.readString(Paths.get(publicKeyLoc), Charset.defaultCharset());
}
public Key makeKeyForRsa(String key) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException {
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
index a2fac136d..64e187c78 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.service.core;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.model.JwtSigningMode;
import org.apache.streampipes.config.backend.model.LocalAuthConfig;
@@ -36,6 +37,12 @@ public class StreamPipesEnvChecker {
BackendConfig coreConfig;
+ private Environment env;
+
+ public StreamPipesEnvChecker() {
+ this.env = Environments.getEnvironment();
+ }
+
public void updateEnvironmentVariables() {
this.coreConfig = BackendConfig.INSTANCE;
@@ -46,37 +53,42 @@ public class StreamPipesEnvChecker {
private void updateJwtSettings() {
LocalAuthConfig localAuthConfig = coreConfig.getLocalAuthConfig();
boolean incompleteConfig = false;
- if (Envs.SP_JWT_SIGNING_MODE.exists()) {
- localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(Envs.SP_JWT_SIGNING_MODE.getValue()));
+ var signingMode = env.getJwtSigningMode();
+ var jwtSecret = env.getJwtSecret();
+ var publicKeyLoc = env.getJwtPublicKeyLoc();
+ var privateKeyLoc = env.getJwtPrivateKeyLoc();
+
+ if (signingMode.exists()) {
+ localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(signingMode.getValue()));
}
- if (Envs.SP_JWT_SECRET.exists()) {
- localAuthConfig.setTokenSecret(Envs.SP_JWT_SECRET.getValue());
+ if (jwtSecret.exists()) {
+ localAuthConfig.setTokenSecret(jwtSecret.getValue());
}
- if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) {
+ if (publicKeyLoc.exists()) {
try {
- localAuthConfig.setPublicKey(readPublicKey(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue()));
+ localAuthConfig.setPublicKey(readPublicKey(publicKeyLoc.getValue()));
} catch (IOException e) {
incompleteConfig = true;
- LOG.warn("Could not read public key at location " + Envs.SP_JWT_PUBLIC_KEY_LOC);
+ LOG.warn("Could not read public key at location " + publicKeyLoc.getValue());
}
}
- if (!Envs.SP_JWT_SIGNING_MODE.exists()) {
+ if (!signingMode.exists()) {
LOG.info(
"No JWT signing mode provided (using default settings), "
+ "consult the docs to learn how to provide JWT settings");
- } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !Envs.SP_JWT_SECRET.exists()) {
+ } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !jwtSecret.exists()) {
LOG.warn(
"JWT signing mode set to HMAC but no secret provided (falling back to auto-generated secret), "
+ "provide a {} variable",
- Envs.SP_JWT_SECRET.getEnvVariableName());
+ jwtSecret.getEnvVariableName());
} else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.RSA
- && ((!Envs.SP_JWT_PUBLIC_KEY_LOC.exists() || !Envs.SP_JWT_PRIVATE_KEY_LOC.exists()) || incompleteConfig)) {
+ && ((!publicKeyLoc.exists() || !privateKeyLoc.exists()) || incompleteConfig)) {
LOG.warn(
"JWT signing mode set to RSA but no public or private key location provided, "
+ "do you provide {} and {} variables?",
- Envs.SP_JWT_PRIVATE_KEY_LOC.getEnvVariableName(),
- Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
+ privateKeyLoc.getEnvVariableName(),
+ publicKeyLoc.getEnvVariableName());
}
if (!incompleteConfig) {
LOG.info("Updating local auth config with signing mode {}", localAuthConfig.getJwtSigningMode().name());
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
index cd7282d32..9148a69ac 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java
@@ -97,7 +97,7 @@ public enum ConsulProvider {
return environment.getConsulLocation().getValue();
} else {
if (environment.getSpDebug().getValueOrReturn(false)) {
- return DefaultEnvValues.CONSUL_HOST_LOCAL;
+ return DefaultEnvValues.LOCALHOST;
} else {
return environment.getConsulHost().getValueOrDefault();
}
diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
index 56f0b2050..9e88dcf19 100644
--- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
+++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.service.extensions.security;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.service.base.security.UnauthorizedRequestEntryPoint;
import org.slf4j.Logger;
@@ -45,9 +46,11 @@ public class WebSecurityConfig {
private static final Logger LOG = LoggerFactory.getLogger(WebSecurityConfig.class);
private final UserDetailsService userDetailsService;
+ private Environment env;
public WebSecurityConfig() {
this.userDetailsService = username -> null;
+ this.env = Environments.getEnvironment();
}
@Autowired
@@ -91,14 +94,15 @@ public class WebSecurityConfig {
}
private boolean isAnonymousAccess() {
- if (Envs.SP_EXT_AUTH_MODE.exists() && Envs.SP_EXT_AUTH_MODE.getValue().equals("AUTH")) {
- if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) {
+ var extAuthMode = env.getExtensionsAuthMode();
+ if (extAuthMode.exists() && extAuthMode.getValue().equals("AUTH")) {
+ if (env.getJwtPublicKeyLoc().exists()) {
LOG.info("Configured service for authenticated access mode");
return false;
} else {
LOG.warn(
"No env variable {} provided, which is required for authenticated access. Defaulting to anonymous access.",
- Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName());
+ env.getJwtPublicKeyLoc().getEnvVariableName());
return true;
}
} else {
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java
deleted file mode 100644
index 8cbdda055..000000000
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.storage.couchdb.utils;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-
-public enum CouchDbConfig {
-
- INSTANCE;
-
- private static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
- private static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
- private static final String PROTOCOL = "PROTOCOL";
- private SpConfig config;
-
- CouchDbConfig() {
- config = SpServiceDiscovery.getSpConfig("storage/couchdb");
- config.register(COUCHDB_HOST, "couchdb", "Hostname for the couch db service");
- config.register(COUCHDB_PORT, 5984, "Port for the couch db service");
- config.register(PROTOCOL, "http", "Protocol the couch db service");
- }
-
- public String getHost() {
- return config.getString(COUCHDB_HOST);
- }
-
- public void setHost(String host) {
- config.setString(COUCHDB_HOST, host);
- }
-
- public int getPort() {
- return config.getInteger(COUCHDB_PORT);
- }
-
- public String getProtocol() {
- return config.getString(PROTOCOL);
- }
-}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index c9f36caf8..7b937a77c 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -63,18 +63,10 @@ public class Utils {
return getCouchDbGsonClient("label");
}
- public static CouchDbClient getCouchDbConnectWorkerContainerClient() {
- return getCouchDbGsonClient("connectworkercontainer");
- }
-
public static CouchDbClient getCouchDbFileMetadataClient() {
return getCouchDbGsonClient("filemetadata");
}
- public static CouchDbClient getCouchDbAdapterTemplateClient() {
- return getCouchDbAdapterClient("adaptertemplate");
- }
-
public static CouchDbClient getCouchDbAssetDashboardClient() {
return getCouchDbGsonClient("assetdashboard");
}
@@ -91,14 +83,6 @@ public class Utils {
return getCouchDbGsonClient("pipeline");
}
- public static CouchDbClient getCouchDbUserGroupStorage() {
- return getCouchDbGsonClient("usergroup");
- }
-
- public static CouchDbClient getCouchDbSepaInvocationClient() {
- return getCouchDbGsonClient("invocation");
- }
-
public static CouchDbClient getCouchDbConnectionClient() {
return getCouchDbStandardSerializerClient("connection");
}
@@ -157,10 +141,6 @@ public class Utils {
return getCouchDbStandardSerializerClient("pipelinecategories");
}
- public static CouchDbClient getCouchDbElasticsearchFilesEndppointClient() {
- return getCouchDbStandardSerializerClient("file-export-endpoints-elasticsearch");
- }
-
public static CouchDbClient getCouchDbDataLakeClient() {
return getCouchDbGsonClient("data-lake");
}
@@ -208,9 +188,10 @@ public class Utils {
}
private static String toUrl() {
- return CouchDbConfig.INSTANCE.getProtocol()
- + "://" + CouchDbConfig.INSTANCE.getHost()
- + ":" + CouchDbConfig.INSTANCE.getPort();
+ var env = getEnvironment();
+ return env.getCouchDbProtocol().getValueOrDefault()
+ + "://" + env.getCouchDbHost().getValueOrDefault()
+ + ":" + env.getCouchDbPort().getValueOrDefault();
}
public static Request getRequest(String route) {
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
index 81215579a..76b4494c8 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java
@@ -17,8 +17,7 @@
*/
package org.apache.streampipes.user.management.encryption;
-import org.apache.streampipes.commons.constants.DefaultEnvValues;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environments;
import org.jasypt.encryption.StringEncryptor;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
@@ -36,11 +35,11 @@ public class SecretEncryptionManager {
}
private static StringEncryptor getEncryptor() {
+ var env = Environments.getEnvironment();
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
- encryptor.setPassword(Envs.SP_ENCRYPTION_PASSCODE.getValueOrDefault(DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE));
+ encryptor.setPassword(env.getEncryptionPasscode().getValueOrDefault());
encryptor.setIvGenerator(new RandomIvGenerator());
return encryptor;
-
}
}
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
index a17fed07b..4abb09a3c 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.user.management.jwt;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.model.JwtSigningMode;
import org.apache.streampipes.config.backend.model.LocalAuthConfig;
@@ -51,9 +52,11 @@ public class JwtTokenProvider {
public static final String CLAIM_USER = "user";
private static final Logger LOG = LoggerFactory.getLogger(JwtTokenProvider.class);
private BackendConfig config;
+ private Environment env;
public JwtTokenProvider() {
this.config = BackendConfig.INSTANCE;
+ this.env = Environments.getEnvironment();
}
public String createToken(Authentication authentication) {
@@ -117,7 +120,7 @@ public class JwtTokenProvider {
}
private Path getKeyFilePath() {
- return Paths.get(Envs.SP_JWT_PRIVATE_KEY_LOC.getValue());
+ return Paths.get(env.getJwtPrivateKeyLoc().getValue());
}
private LocalAuthConfig authConfig() {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index eb0a49c27..ccc2b9ff5 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.wrapper.standalone.function;
-import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.declarer.IFunctionConfig;
import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer;
@@ -158,8 +159,9 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
private Map<String, SpInputCollector> getInputCollectors(Collection<SpDataStream> streams) throws SpRuntimeException {
Map<String, SpInputCollector> inputCollectors = new HashMap<>();
+ var env = getEnvironment();
for (SpDataStream is : streams) {
- if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) {
+ if (env.getSpDebug().getValueOrDefault()) {
GroundingDebugUtils.modifyGrounding(is.getEventGrounding());
}
inputCollectors.put(is.getElementId(), ProtocolManager.findInputCollector(is.getEventGrounding()
@@ -193,6 +195,10 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare
return new SchemaInfo(eventSchema, new ArrayList<>());
}
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+
public abstract IFunctionConfig getFunctionConfig();
public abstract void onServiceStarted(FunctionContext context);