You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/28 15:22:25 UTC

[incubator-streampipes] 04/05: [hotfix] Modify Kafka topic of data simulator if debug flag is present

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1dea25447ba9c7cb8671f193ed14adc7f173b8b4
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Dec 28 16:22:03 2021 +0100

    [hotfix] Modify Kafka topic of data simulator if debug flag is present
---
 .../streampipes-sources-vehicle-simulator/pom.xml         | 15 +++++++++++++++
 .../vehicle/simulator/simulator/VehicleDataSimulator.java | 15 +++++++++++++--
 .../streampipes-sources-watertank-simulator/pom.xml       | 15 +++++++++++++++
 .../watertank/simulator/utils/WatertankDataSimulator.java | 15 +++++++++++++--
 4 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml b/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
index 11a16c9..37c2fe2 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/pom.xml
@@ -57,6 +57,21 @@
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <classifier>embed</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <executions>
diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
index f36b0b4..4770c38 100644
--- a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sources.vehicle.simulator.simulator;
 import net.acesinc.data.json.generator.config.JSONConfigReader;
 import net.acesinc.data.json.generator.config.SimulationConfig;
 import net.acesinc.data.json.generator.config.WorkflowConfig;
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -40,8 +41,8 @@ public class VehicleDataSimulator implements Runnable {
       ConfigExtractor configExtractor = ConfigExtractor.from(DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup());
       SimulationConfig config = buildSimulationConfig();
       Map<String, TopicAwareWorkflow> workflows = buildSimWorkflows(config);
-      String kafkaHost = configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
-      Integer kafkaPort = configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+      String kafkaHost = getKafkaHost(configExtractor);
+      Integer kafkaPort = getKafkaPort(configExtractor);
       new StreamPipesSimulationRunner(config, workflows, kafkaHost, kafkaPort).startSimulation();
 
     } catch (IOException e) {
@@ -65,6 +66,16 @@ public class VehicleDataSimulator implements Runnable {
     return workflows;
   }
 
+  private String getKafkaHost(ConfigExtractor configExtractor) {
+    return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+            "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
+  }
+
+  private Integer getKafkaPort(ConfigExtractor configExtractor) {
+    return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+            9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+  }
+
   @Override
   public void run() {
     initSimulation();
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml b/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
index 7c68924..9d08a8d 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/pom.xml
@@ -57,6 +57,21 @@
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <classifier>embed</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <executions>
diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
index ad0d0c6..1b94d49 100644
--- a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
+++ b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sources.watertank.simulator.utils;
 import net.acesinc.data.json.generator.config.JSONConfigReader;
 import net.acesinc.data.json.generator.config.SimulationConfig;
 import net.acesinc.data.json.generator.config.WorkflowConfig;
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner;
@@ -40,8 +41,8 @@ public class WatertankDataSimulator implements Runnable {
       ConfigExtractor configExtractor = ConfigExtractor.from(DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup());
       SimulationConfig config = buildSimulationConfig();
       Map<String, TopicAwareWorkflow> workflows = buildSimWorkflows(config);
-      String kafkaHost = configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
-      Integer kafkaPort = configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+      String kafkaHost = getKafkaHost(configExtractor);
+      Integer kafkaPort = getKafkaPort(configExtractor);
       new StreamPipesSimulationRunner(config, workflows, kafkaHost, kafkaPort).startSimulation();
 
     } catch (IOException e) {
@@ -57,6 +58,16 @@ public class WatertankDataSimulator implements Runnable {
             SimulationConfig.class);
   }
 
+  private String getKafkaHost(ConfigExtractor configExtractor) {
+    return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+            "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST);
+  }
+
+  private Integer getKafkaPort(ConfigExtractor configExtractor) {
+    return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() ?
+            9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT);
+  }
+
   private Map<String, TopicAwareWorkflow> buildSimWorkflows(SimulationConfig config) throws IOException {
     Map<String, TopicAwareWorkflow> workflows = new HashMap<>();
     for(WorkflowConfig workflowConfig : config.getWorkflows()) {