You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/10/20 16:05:16 UTC

[incubator-streampipes] branch edge-extensions updated (f037eff -> c5411c2)

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from f037eff  update logger and fix logging inconsistencies
     new d1d1832  increase simulation time for latency test
     new a8ecdf0  Merge remote-tracking branch 'refs/remotes/origin/edge-extensions' into edge-extensions
     new c5411c2  add offloading configuration envs, optionalize target broker alive check

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../node/controller/config/EnvConfigParam.java     |  3 +
 .../node/controller/config/NodeConfiguration.java  | 52 +++++++++++---
 .../strategies/OffloadingStrategyFactory.java      | 49 +++++++++----
 .../relay/bridges/MultiBrokerBridge.java           | 18 ++++-
 .../apache/streampipes/performance/SineSignal.java | 83 ++++++++++++++++++++++
 .../streampipes/performance/TestFactory.java       |  2 +-
 .../performance/performancetest/GenericTest.java   |  4 +-
 7 files changed, 185 insertions(+), 26 deletions(-)
 create mode 100644 streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/SineSignal.java

[incubator-streampipes] 03/03: add offloading configuration envs, optionalize target broker alive check

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c5411c2f6bde5f3d47f282601eba0cc02a4caee7
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Oct 20 18:05:00 2021 +0200

    add offloading configuration envs, optionalize target broker alive check
---
 .../node/controller/config/EnvConfigParam.java     |  3 ++
 .../node/controller/config/NodeConfiguration.java  | 52 ++++++++++++++++++----
 .../strategies/OffloadingStrategyFactory.java      | 49 ++++++++++++++------
 .../relay/bridges/MultiBrokerBridge.java           | 18 +++++++-
 .../performance/performancetest/GenericTest.java   |  4 +-
 5 files changed, 101 insertions(+), 25 deletions(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index f235a5c..c8ecd58 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -44,6 +44,7 @@ public enum EnvConfigParam {
     DOCKER_STATS_COLLECT_FREQ("SP_DOCKER_STATS_COLLECT_FREQ_SECS", "1"),
     RESOURCE_UPDATE_FREQ("SP_NODE_RESOURCE_UPDATE_FREQ_SECS", "30"),
     EVENT_RELAY_BUFFER_SIZE("SP_NODE_EVENT_BUFFER_SIZE", "1000"),
+    EVENT_RELAY_TARGET_BROKER_CHECK_ENABLED("SP_EVENT_RELAY_TARGET_BROKER_CHECK_ENABLED", "true"),
     NODE_GPU_ACCESS("SP_NODE_HAS_GPU", "false"),
     NODE_GPU_CORES("SP_NODE_GPU_CUDA_CORES", "0"),
     NODE_GPU_TYPE("SP_NODE_GPU_TYPE", "n/a"),
@@ -52,6 +53,8 @@ public enum EnvConfigParam {
     SUPPORTED_PIPELINE_ELEMENTS("SP_SUPPORTED_PIPELINE_ELEMENTS", ""),
     AUTO_OFFLOADING("SP_AUTO_OFFLOADING_ACTIVATED", "false"),
     AUTO_OFFLOADING_STRATEGY("SP_AUTO_OFFLOADING_STRATEGY", "default"),
+    AUTO_OFFLOADING_THRESHOLD_IN_PERCENT("SP_AUTO_OFFLOADING_THRESHOLD_IN_PERCENT", "90"),
+    AUTO_OFFLOADING_MAX_NUM_VIOLATIONS("SP_AUTO_OFFLOADING_MAX_NUM_VIOLATIONS", "4"),
     NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes"),
     LOGGING_MQTT_URL("SP_LOGGING_MQTT_URL", "tcp://localhost:1883");
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 4f9dc58..59d402f 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -57,10 +57,13 @@ public final class NodeConfiguration {
     private static int dockerPruningFreqSecs;
     private static int dockerStatsCollectFreqSecs;
     private static int resourceMonitorFreqSecs;
-    private static int relayEventBufferSize;
+    private static int eventRelayBufferSize;
+    private static boolean eventRelayTargetBrokerCheckEnabled;
     private static String consulHost;
     private static boolean autoOffloadingActivated;
     private static OffloadingStrategyType autoOffloadingStrategy;
+    private static float autoOffloadingThresholdInPercent;
+    private static int autoOffloadingMaxNumViolations;
     private static String nodeStoragePath;
     private static String loggingMqttUrl;
 
@@ -258,12 +261,20 @@ public final class NodeConfiguration {
         NodeConfiguration.resourceMonitorFreqSecs = resourceMonitorFreqSecs;
     }
 
-    public static int getRelayEventBufferSize() {
-        return relayEventBufferSize;
+    public static int getEventRelayBufferSize() {
+        return eventRelayBufferSize;
     }
 
-    public static void setRelayEventBufferSize(int relayEventBufferSize) {
-        NodeConfiguration.relayEventBufferSize = relayEventBufferSize;
+    public static void setEventRelayBufferSize(int eventRelayBufferSize) {
+        NodeConfiguration.eventRelayBufferSize = eventRelayBufferSize;
+    }
+
+    public static boolean isEventRelayTargetBrokerCheckEnabled() {
+        return eventRelayTargetBrokerCheckEnabled;
+    }
+
+    public static void setEventRelayTargetBrokerCheckEnabled(boolean eventRelayTargetBrokerCheckEnabled) {
+        NodeConfiguration.eventRelayTargetBrokerCheckEnabled = eventRelayTargetBrokerCheckEnabled;
     }
 
     public static String getConsulHost() {
@@ -302,6 +313,22 @@ public final class NodeConfiguration {
         NodeConfiguration.autoOffloadingStrategy = autoOffloadingStrategy;
     }
 
+    public static float getAutoOffloadingThresholdInPercent() {
+        return autoOffloadingThresholdInPercent;
+    }
+
+    public static void setAutoOffloadingThresholdInPercent(float autoOffloadingThresholdInPercent) {
+        NodeConfiguration.autoOffloadingThresholdInPercent = autoOffloadingThresholdInPercent;
+    }
+
+    public static int getAutoOffloadingMaxNumViolations() {
+        return autoOffloadingMaxNumViolations;
+    }
+
+    public static void setAutoOffloadingMaxNumViolations(int autoOffloadingMaxNumViolations) {
+        NodeConfiguration.autoOffloadingMaxNumViolations = autoOffloadingMaxNumViolations;
+    }
+
     public static String getNodeStoragePath() {
         return nodeStoragePath;
     }
@@ -460,7 +487,6 @@ public final class NodeConfiguration {
                         break;
                     }
                     break;
-
                 case BACKEND_PORT:
                     if (!System.getenv().containsKey(EnvConfigParam.BACKEND_URL.getEnvironmentKey())) {
                         configMap.put(envKey, value);
@@ -468,7 +494,6 @@ public final class NodeConfiguration {
                         break;
                     }
                     break;
-
                 case DOCKER_PRUNING_FREQ:
                     configMap.put(envKey, value);
                     setDockerPruningFreqSecs(Integer.parseInt(value));
@@ -483,8 +508,11 @@ public final class NodeConfiguration {
                     break;
                 case EVENT_RELAY_BUFFER_SIZE:
                     configMap.put(envKey, value);
-                    setRelayEventBufferSize(Integer.parseInt(value));
+                    setEventRelayBufferSize(Integer.parseInt(value));
                     break;
+                case EVENT_RELAY_TARGET_BROKER_CHECK_ENABLED:
+                    configMap.put(envKey, value);
+                    setEventRelayTargetBrokerCheckEnabled(Boolean.parseBoolean(value));
                 case NODE_GPU_ACCESS:
                     configMap.put(envKey, value);
                     setGpuAccelerated(Boolean.parseBoolean(value));
@@ -536,6 +564,14 @@ public final class NodeConfiguration {
                     configMap.put(envKey, value);
                     setAutoOffloadingStrategy(OffloadingStrategyType.fromString(value));
                     break;
+                case AUTO_OFFLOADING_THRESHOLD_IN_PERCENT:
+                    configMap.put(envKey, value);
+                    setAutoOffloadingThresholdInPercent(Float.parseFloat(value));
+                    break;
+                case AUTO_OFFLOADING_MAX_NUM_VIOLATIONS:
+                    configMap.put(envKey, value);
+                    setAutoOffloadingMaxNumViolations(Integer.parseInt(value));
+                    break;
                 case NODE_STORAGE_PATH:
                     configMap.put(envKey, value);
                     setNodeStoragePath(value);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
index 9ca95a9..8e17b6d 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.node.controller.management.offloading.strategies.p
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.CPULoadResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeDiskSpaceResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeMemoryResourceProperty;
-import org.apache.streampipes.node.controller.management.offloading.strategies.selection.PrioritySelectionStrategy;
 import org.apache.streampipes.node.controller.management.offloading.strategies.selection.RandomSelectionStrategy;
 
 import java.util.ArrayList;
@@ -34,6 +33,8 @@ import java.util.List;
 
 public class OffloadingStrategyFactory {
 
+    private static final int HISTORY_QUEUE_SIZE_FACTOR = 5;
+
     public List<OffloadingStrategy<?>> select(){
 
         switch (NodeConfiguration.getAutoOffloadingStrategy()) {
@@ -44,15 +45,23 @@ public class OffloadingStrategyFactory {
             case DISK:
                 return Collections.singletonList(getDefaultDiskSpaceOffloadingPolicy());
             case DEBUG:
-                return Collections.singletonList(new OffloadingStrategy<Float>(
-                        new ThresholdViolationOffloadingPolicy<>(5,
-                                Comparator.GREATER, 0.5f, 1),
-                        new CPULoadResourceProperty(), new RandomSelectionStrategy()));
+                return Collections.singletonList(getDebugOffloadingPolicy());
             default:
                 return getDefaultStrategy();
         }
     }
 
+    private OffloadingStrategy<Float> getDebugOffloadingPolicy() {
+        return new OffloadingStrategy<Float>(
+                new ThresholdViolationOffloadingPolicy<>(
+                        5,
+                        Comparator.GREATER,
+                        0.5f,
+                        1),
+                new CPULoadResourceProperty(),
+                new RandomSelectionStrategy());
+    }
+
     private List<OffloadingStrategy<?>> getDefaultStrategy(){
         List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
         offloadingStrategies.add(getDefaultCPUOffloadingPolicy());
@@ -63,8 +72,13 @@ public class OffloadingStrategyFactory {
 
     private OffloadingStrategy<Float> getDefaultCPUOffloadingPolicy(){
         return new OffloadingStrategy<>(
-                new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 4),
-                new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                new ThresholdViolationOffloadingPolicy<>(
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations() + HISTORY_QUEUE_SIZE_FACTOR,
+                        Comparator.GREATER,
+                        NodeConfiguration.getAutoOffloadingThresholdInPercent(),
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations()),
+                new CPULoadResourceProperty(),
+                new RandomSelectionStrategy());
     }
 
     private OffloadingStrategy<Long> getDefaultMemoryOffloadingPolicy(){
@@ -72,9 +86,13 @@ public class OffloadingStrategyFactory {
                 .getNodeResources().getHardwareResource().getMemory().getMemTotal();
         long threshold = (long) (totalMemory * 0.15);
         return new OffloadingStrategy<>(
-                new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
-                        threshold, 4),
-                new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+                new ThresholdViolationOffloadingPolicy<>(
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations() + HISTORY_QUEUE_SIZE_FACTOR,
+                        Comparator.SMALLER,
+                        threshold,
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations()),
+                new FreeMemoryResourceProperty(),
+                new RandomSelectionStrategy());
     }
 
     private OffloadingStrategy<Long> getDefaultDiskSpaceOffloadingPolicy(){
@@ -82,9 +100,12 @@ public class OffloadingStrategyFactory {
                 .getNodeResources().getHardwareResource().getDisk().getDiskTotal();
         long threshold = (long) (totalDisk * 0.1);
         return new OffloadingStrategy<>(
-                new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
-                        threshold, 4),
-                new FreeDiskSpaceResourceProperty(), new RandomSelectionStrategy());
+                new ThresholdViolationOffloadingPolicy<>(
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations() + HISTORY_QUEUE_SIZE_FACTOR,
+                        Comparator.SMALLER,
+                        threshold,
+                        NodeConfiguration.getAutoOffloadingMaxNumViolations()),
+                new FreeDiskSpaceResourceProperty(),
+                new RandomSelectionStrategy());
     }
-
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 9bea9c9..71cb142 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -45,7 +45,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
     private final EventRelayStrategy eventRelayStrategy;
     private final ArrayList<byte[]> eventBuffer = new ArrayList<>();
 
-    private final int EVENT_BUFFER_SIZE = NodeConfiguration.getRelayEventBufferSize();
+    private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
     private final Tuple3<String, Integer, String> relayInfo;
     private boolean isBuffering = false;
 
@@ -79,6 +79,14 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
 
     @Override
     public void publish(byte[] event) {
+        if (NodeConfiguration.isEventRelayTargetBrokerCheckEnabled()) {
+            publishWithCheck(event);
+        } else {
+            publishWithoutCheck(event);
+        }
+    }
+
+    private void publishWithCheck(byte[] event) {
         // check if target broker can be reached
         if (isTargetBrokerAlive()) {
 
@@ -91,7 +99,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                     // TODO: send buffered event should run independent of callback
                     // send buffered events & clear buffer
                     LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
-                            "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
+                                    "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
                             relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
 
                     // add current event from callback
@@ -137,6 +145,12 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         }
     }
 
+    //TODO: Delete after testing
+    private void publishWithoutCheck(byte[] event) {
+        producer.publish(event);
+        metrics.increaseNumRelayedEvents();
+    }
+
     @Override
     public void stop() throws SpRuntimeException {
         LOG.info("Stop event relay to broker={}:{}, topic={}", relayInfo.a, relayInfo.b, relayInfo.c);
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 892a5c6..7b21bb6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -69,6 +69,8 @@ public class GenericTest implements Test{
     public void execute(int nrRuns) {
 
         String testType = System.getenv("TEST_TYPE");
+        String offloadingThreshold = System.getenv("OFFLOADING_THRESHOLD");
+
         Object[] line = null;
         //Start Pipeline
         if (!pipeline.isRunning()) {
@@ -125,7 +127,7 @@ public class GenericTest implements Test{
                         .filter(FreeTextStaticProperty::isReconfigurable)
                         .forEach(sp -> {
                             if (sp.getInternalName().equals("load")) {
-                                sp.setValue(Float.toString(0.9f));
+                                sp.setValue(offloadingThreshold != null ? offloadingThreshold : "0.9");
                             }
                         }));
             line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};

[incubator-streampipes] 02/03: Merge remote-tracking branch 'refs/remotes/origin/edge-extensions' into edge-extensions

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a8ecdf0266edc289264104b2de6f023e7259392d
Merge: d1d1832 f037eff
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Oct 19 21:29:00 2021 +0200

    Merge remote-tracking branch 'refs/remotes/origin/edge-extensions' into edge-extensions

 .../logging/evaluation/EvaluationLogger.java       | 23 ++++++++++++++--------
 .../controller/api/InvocableEntityResource.java    |  6 ++----
 .../offloading/OffloadingPolicyManager.java        | 10 ++++------
 .../ThresholdViolationOffloadingPolicy.java        | 14 +++++++++----
 .../statscollector/DockerStatsCollector.java       |  2 +-
 .../streampipes/performance/TestFactory.java       |  6 +++---
 .../performance/performancetest/GenericTest.java   | 15 ++++++++++++--
 .../pipeline/PipelineMigrationExecutor.java        | 15 +++++---------
 8 files changed, 53 insertions(+), 38 deletions(-)


[incubator-streampipes] 01/03: increase simulation time for latency test

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d1d18321abc0512ecb98129989107509493eff4f
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Oct 14 17:20:48 2021 +0200

    increase simulation time for latency test
---
 .../apache/streampipes/performance/SineSignal.java | 83 ++++++++++++++++++++++
 .../streampipes/performance/TestFactory.java       |  2 +-
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/SineSignal.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/SineSignal.java
new file mode 100644
index 0000000..aebf5c0
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/SineSignal.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+
+import java.util.List;
+
+public class SineSignal {
+
+    public static void main(String ... args) {
+
+        StreamPipesCredentials credentials = StreamPipesCredentials
+                .from("wiener@fzi.de", "lh2pcpVm2WhRy04t8kXxXZAK");
+
+        // Create an instance of the StreamPipes client
+        StreamPipesClient client = StreamPipesClient
+                .create("ipe-zwergwal-01.fzi.de", 80, credentials, true);
+
+        List<Pipeline> pipelines = client.pipelines().all();
+        Pipeline pipeline = pipelines.stream()
+                .filter(p -> p.getName().equals("jet"))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+
+
+        for (int i=0; i<100; i++) {
+            double y = square(i);
+
+            pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                    .filter(FreeTextStaticProperty.class::isInstance)
+                    .map(FreeTextStaticProperty.class::cast)
+                    .filter(FreeTextStaticProperty::isReconfigurable)
+                    .forEach(sp -> {
+                        if (sp.getInternalName().equals("i-am-reconfigurable")) {
+                            sp.setValue(Float.toString((float) y));
+                        }
+                    }));
+
+            System.out.println("Trying to reconfigure with value: " + y);
+            PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+            System.out.println(message.getTitle());
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static double sine(int i){
+        double a = i / 10d;
+        return 3 * Math.sin(a);
+    }
+
+    private static int triangle(int i) {
+        return  Math.abs((i % 6) - 3);
+    }
+
+    private static int square(int i) {
+        return (i % 6) < 3 ? 3 : 0;
+    }
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 951a540..ec9c02d 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -58,7 +58,7 @@ public class TestFactory {
 
     public static Test getLatencyTest(){
         return new GenericTest(getPipelineName(), true, false,
-                false, 0, 30000);
+                false, 0, 600000);
     }
 
     public static Test getMigrationTest(){