You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/28 15:22:25 UTC
[incubator-streampipes] 04/05: [hotfix] Modify Kafka topic of data simulator if debug flag is present
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 1dea25447ba9c7cb8671f193ed14adc7f173b8b4
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Dec 28 16:22:03 2021 +0100
[hotfix] Modify Kafka topic of data simulator if debug flag is present
---
.../streampipes-sources-vehicle-simulator/pom.xml | 15 +++++++++++++++
.../vehicle/simulator/simulator/VehicleDataSimulator.java | 15 +++++++++++++--
.../streampipes-sources-watertank-simulator/pom.xml | 15 +++++++++++++++
.../watertank/simulator/utils/WatertankDataSimulator.java | 15 +++++++++++++--
4 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml b/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
index 11a16c9..37c2fe2 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
@@ -57,6 +57,21 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>embed</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
index f36b0b4..4770c38 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sources.vehicle.simulator.simulator;
import net.acesinc.data.json.generator.config.JSONConfigReader;
import net.acesinc.data.json.generator.config.SimulationConfig;
import net.acesinc.data.json.generator.config.WorkflowConfig;
+import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -40,8 +41,8 @@ public class VehicleDataSimulator implements Runnable {
ConfigExtractor configExtractor = ConfigExtractor.from(DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup());
SimulationConfig config = buildSimulationConfig();
Map<String, TopicAwareWorkflow> workflows = buildSimWorkflows(config);
- String kafkaHost = configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
- Integer kafkaPort = configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+ String kafkaHost = getKafkaHost(configExtractor);
+ Integer kafkaPort = getKafkaPort(configExtractor);
new StreamPipesSimulationRunner(config, workflows, kafkaHost, kafkaPort).startSimulation();
} catch (IOException e) {
@@ -65,6 +66,16 @@ public class VehicleDataSimulator implements Runnable {
return workflows;
}
+ private String getKafkaHost(ConfigExtractor configExtractor) {
+ return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+ "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
+ }
+
+ private Integer getKafkaPort(ConfigExtractor configExtractor) {
+ return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+ 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+ }
+
@Override
public void run() {
initSimulation();
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml b/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
index 7c68924..9d08a8d 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
@@ -57,6 +57,21 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>embed</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
index ad0d0c6..1b94d49 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sources.watertank.simulator.utils;
import net.acesinc.data.json.generator.config.JSONConfigReader;
import net.acesinc.data.json.generator.config.SimulationConfig;
import net.acesinc.data.json.generator.config.WorkflowConfig;
+import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -40,8 +41,8 @@ public class WatertankDataSimulator implements Runnable {
ConfigExtractor configExtractor = ConfigExtractor.from(DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup());
SimulationConfig config = buildSimulationConfig();
Map<String, TopicAwareWorkflow> workflows = buildSimWorkflows(config);
- String kafkaHost = configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
- Integer kafkaPort = configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+ String kafkaHost = getKafkaHost(configExtractor);
+ Integer kafkaPort = getKafkaPort(configExtractor);
new StreamPipesSimulationRunner(config, workflows, kafkaHost, kafkaPort).startSimulation();
} catch (IOException e) {
@@ -57,6 +58,16 @@ public class WatertankDataSimulator implements Runnable {
SimulationConfig.class);
}
+ private String getKafkaHost(ConfigExtractor configExtractor) {
+ return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+ "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
+ }
+
+ private Integer getKafkaPort(ConfigExtractor configExtractor) {
+ return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+ 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+ }
+
private Map<String, TopicAwareWorkflow> buildSimWorkflows(SimulationConfig config) throws IOException {
Map<String, TopicAwareWorkflow> workflows = new HashMap<>();
for(WorkflowConfig workflowConfig : config.getWorkflows()) {