You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/09 15:58:14 UTC

[incubator-streampipes] branch edge-extensions updated: Added Tests and logging

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 1acf63e  Added Tests and logging
1acf63e is described below

commit 1acf63e29c5189b1ee2870d2bd8d128c8ed2cd85
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Jun 9 17:55:22 2021 +0200

    Added Tests and logging
---
 .../logging/evaluation/EvaluationLogger.java       |  65 +++++++++++
 .../model/node/NodeInfoDescriptionBuilder.java     |   1 -
 .../src/main/java/Test/GenericTest.java            | 128 +++++++++++++++++++++
 .../src/main/java/Test/Test.java                   |  24 ++++
 .../EdgeExtensionsGenericPerformanceTest.java      |  40 +++++++
 .../streampipes/performance/TestFactory.java       |  69 +++++++++++
 6 files changed, 326 insertions(+), 1 deletion(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
new file mode 100644
index 0000000..07763d1
--- /dev/null
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.logging.evaluation;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class EvaluationLogger {
+
+    private final File file;
+    private String buffer;
+    private static EvaluationLogger instance = null;
+
+    public static EvaluationLogger getInstance(){
+        if(instance==null) instance = new EvaluationLogger();
+        return instance;
+    }
+
+    private EvaluationLogger(){
+        String filepath = System.getenv("LoggingFilepath");
+        if(filepath==null) throw new RuntimeException("No Logging Location provided.");
+        this.file = new File(filepath);
+        try {
+            if(!file.exists()){
+                if(!file.getParentFile().exists()) file.getParentFile().mkdirs();
+                file.createNewFile();
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void addLine(Object[] elements){
+        for(Object element:elements)
+            buffer += element + ",";
+        buffer += "\n";
+    }
+
+    public void writeOut(){
+        try (FileWriter fw = new FileWriter(file)){
+            fw.append(buffer);
+            fw.flush();
+            buffer = "";
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
index ba42b91..5a767d9 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streampipes.model.node;
 
-import com.ibm.dtfj.corereaders.zos.dumpreader.AddressRange;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
diff --git a/streampipes-performance-tests/src/main/java/Test/GenericTest.java b/streampipes-performance-tests/src/main/java/Test/GenericTest.java
new file mode 100644
index 0000000..d3b30a4
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/Test/GenericTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package Test;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+
+import java.util.List;
+
+public class GenericTest implements Test{
+
+    private final boolean stopPipeline;
+    private final boolean shouldBeMigrated;
+    private final boolean shouldBeReconfigured;
+    private final long timeToSleepBeforeManipulation;
+    private final long timeToSleepAfterManipulation;
+    private final StreamPipesClient client;
+    private final Pipeline pipeline;
+    private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
+
+    public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
+                       long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
+        this.stopPipeline = stopPipeline;
+        this.shouldBeMigrated = shouldBeMigrated;
+        this.shouldBeReconfigured = shouldBeReconfigured;
+        this.timeToSleepBeforeManipulation = timeToSleepBeforeManipulation;
+        this.timeToSleepAfterManipulation = timeToSleepAfterManipulation;
+        StreamPipesCredentials credentials = StreamPipesCredentials
+                .from(System.getenv("user"), System.getenv("apiKey"));
+        // Create an instance of the StreamPipes client
+        client = StreamPipesClient
+                .create("localhost", 8082, credentials, true);
+        List<Pipeline> pipelines = client.pipelines().all();
+        pipeline = pipelines.stream()
+                .filter(p -> p.getName().equals(pipelineName))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+    }
+
+    @Override
+    public void execute() {
+
+        //Start Pipeline
+        if (!pipeline.isRunning()) {
+            long beforeStart = System.nanoTime();
+            PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
+            long deploymentDuration = System.nanoTime() - beforeStart;
+            Object[] line = {System.currentTimeMillis() ,"Deployment duration", deploymentDuration};
+            evalLogger.addLine(line);
+            if (startMessage.isSuccess()) {
+                System.out.println(startMessage.getTitle());
+            }
+        }
+
+        try {
+            Thread.sleep(timeToSleepBeforeManipulation);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        //Manipulate Pipeline
+        //Migration
+        if (shouldBeMigrated) {
+            pipeline.getSepas().forEach(p -> {
+                p.setDeploymentTargetNodeId("edge-02.node-controller");
+                p.setDeploymentTargetNodeHostname("edge02.example.de");
+                p.setDeploymentTargetNodePort(7078);
+            });
+            long beforeMigration = System.nanoTime();
+            PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
+            long migrationDuration = System.nanoTime() - beforeMigration;
+            Object[] line = {System.currentTimeMillis(), "Migration duration", migrationDuration};
+            evalLogger.addLine(line);
+            if (migrationMessage.isSuccess()) {
+                System.out.println(migrationMessage.getTitle());
+            }
+        }
+        //Reconfiguration
+        if (shouldBeReconfigured) {
+            pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                    .filter(FreeTextStaticProperty.class::isInstance)
+                    .map(FreeTextStaticProperty.class::cast)
+                    .filter(FreeTextStaticProperty::isReconfigurable)
+                    .forEach(sp -> {
+
+                        if (sp.getInternalName().equals("i-am-reconfigurable")) {
+                            sp.setValue("999");
+                        }
+                    }));
+            PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+            if (message.isSuccess()) {
+                System.out.println(message.getTitle());
+            }
+        }
+
+        try {
+            Thread.sleep(timeToSleepAfterManipulation);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        //Stop Pipeline
+        if(stopPipeline && !pipeline.isRunning()){
+            PipelineOperationStatus stopMessage = client.pipelines().stop(pipeline);
+            if(stopMessage.isSuccess()) {
+                System.out.println("Pipeline successfully stopped");
+            }
+        }
+    }
+}
diff --git a/streampipes-performance-tests/src/main/java/Test/Test.java b/streampipes-performance-tests/src/main/java/Test/Test.java
new file mode 100644
index 0000000..706aeed
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/Test/Test.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package Test;
+
+public interface Test {
+
+    public void execute();
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
new file mode 100644
index 0000000..410889f
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import Test.GenericTest;
+import Test.Test;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+
+
+public class EdgeExtensionsGenericPerformanceTest {
+
+    public static void main(String ... args){
+
+        int numberOfRuns = 5;
+
+        Test test = TestFactory.getFromEnv();
+
+        for(int i = 1; i <= numberOfRuns; i++){
+            test.execute();
+        }
+        EvaluationLogger.getInstance().writeOut();
+
+    }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
new file mode 100644
index 0000000..c2f875a
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import Test.GenericTest;
+import Test.Test;
+
+public class TestFactory {
+
+
+    public static Test getFromEnv(){
+        switch (System.getenv("TestType")){
+            case "Deployment":
+                return getDeploymentTest();
+            case "Latency":
+                return getLatencyTest();
+            case "Migration":
+                return getMigrationTest();
+            case "Reconfiguration":
+                return getReconfigurationTest();
+            default:
+                throw new RuntimeException("No test configuration found.");
+        }
+    }
+
+
+    public static Test getDeploymentTest(){
+        return new GenericTest(getPipelineName(), true, false,
+                false, 0, 1000);
+    }
+
+    public static Test getLatencyTest(){
+        return new GenericTest(getPipelineName(), true, false,
+                false, 0, 30000);
+    }
+
+    public static Test getMigrationTest(){
+        return new GenericTest(getPipelineName(), true, true,
+                false, 15000, 10000);
+    }
+
+    public static Test getReconfigurationTest(){
+        return new GenericTest(getPipelineName(), true, false,
+                true, 15000, 10000);
+    }
+
+    //Helpers
+    private static String getPipelineName(){
+        String pipelineName = System.getenv("PipelineName");
+        if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+        return pipelineName;
+    }
+
+}