You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/06/09 15:58:14 UTC
[incubator-streampipes] branch edge-extensions updated: Added Tests
and logging
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 1acf63e Added Tests and logging
1acf63e is described below
commit 1acf63e29c5189b1ee2870d2bd8d128c8ed2cd85
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Jun 9 17:55:22 2021 +0200
Added Tests and logging
---
.../logging/evaluation/EvaluationLogger.java | 65 +++++++++++
.../model/node/NodeInfoDescriptionBuilder.java | 1 -
.../src/main/java/Test/GenericTest.java | 128 +++++++++++++++++++++
.../src/main/java/Test/Test.java | 24 ++++
.../EdgeExtensionsGenericPerformanceTest.java | 40 +++++++
.../streampipes/performance/TestFactory.java | 69 +++++++++++
6 files changed, 326 insertions(+), 1 deletion(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
new file mode 100644
index 0000000..07763d1
--- /dev/null
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.logging.evaluation;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class EvaluationLogger {
+
+ private final File file;
+ private String buffer;
+ private static EvaluationLogger instance = null;
+
+ public static EvaluationLogger getInstance(){
+ if(instance==null) instance = new EvaluationLogger();
+ return instance;
+ }
+
+ private EvaluationLogger(){
+ String filepath = System.getenv("LoggingFilepath");
+ if(filepath==null) throw new RuntimeException("No Logging Location provided.");
+ this.file = new File(filepath);
+ try {
+ if(!file.exists()){
+ if(!file.getParentFile().exists()) file.getParentFile().mkdirs();
+ file.createNewFile();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void addLine(Object[] elements){
+ for(Object element:elements)
+ buffer += element + ",";
+ buffer += "\n";
+ }
+
+ public void writeOut(){
+ try (FileWriter fw = new FileWriter(file)){
+ fw.append(buffer);
+ fw.flush();
+ buffer = "";
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
index ba42b91..5a767d9 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
@@ -17,7 +17,6 @@
*/
package org.apache.streampipes.model.node;
-import com.ibm.dtfj.corereaders.zos.dumpreader.AddressRange;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
diff --git a/streampipes-performance-tests/src/main/java/Test/GenericTest.java b/streampipes-performance-tests/src/main/java/Test/GenericTest.java
new file mode 100644
index 0000000..d3b30a4
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/Test/GenericTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package Test;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+
+import java.util.List;
+
+public class GenericTest implements Test{
+
+ private final boolean stopPipeline;
+ private final boolean shouldBeMigrated;
+ private final boolean shouldBeReconfigured;
+ private final long timeToSleepBeforeManipulation;
+ private final long timeToSleepAfterManipulation;
+ private final StreamPipesClient client;
+ private final Pipeline pipeline;
+ private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
+
+ public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
+ long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
+ this.stopPipeline = stopPipeline;
+ this.shouldBeMigrated = shouldBeMigrated;
+ this.shouldBeReconfigured = shouldBeReconfigured;
+ this.timeToSleepBeforeManipulation = timeToSleepBeforeManipulation;
+ this.timeToSleepAfterManipulation = timeToSleepAfterManipulation;
+ StreamPipesCredentials credentials = StreamPipesCredentials
+ .from(System.getenv("user"), System.getenv("apiKey"));
+ // Create an instance of the StreamPipes client
+ client = StreamPipesClient
+ .create("localhost", 8082, credentials, true);
+ List<Pipeline> pipelines = client.pipelines().all();
+ pipeline = pipelines.stream()
+ .filter(p -> p.getName().equals(pipelineName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+ }
+
+ @Override
+ public void execute() {
+
+ //Start Pipeline
+ if (!pipeline.isRunning()) {
+ long beforeStart = System.nanoTime();
+ PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
+ long deploymentDuration = System.nanoTime() - beforeStart;
+ Object[] line = {System.currentTimeMillis() ,"Deployment duration", deploymentDuration};
+ evalLogger.addLine(line);
+ if (startMessage.isSuccess()) {
+ System.out.println(startMessage.getTitle());
+ }
+ }
+
+ try {
+ Thread.sleep(timeToSleepBeforeManipulation);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //Manipulate Pipeline
+ //Migration
+ if (shouldBeMigrated) {
+ pipeline.getSepas().forEach(p -> {
+ p.setDeploymentTargetNodeId("edge-02.node-controller");
+ p.setDeploymentTargetNodeHostname("edge02.example.de");
+ p.setDeploymentTargetNodePort(7078);
+ });
+ long beforeMigration = System.nanoTime();
+ PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
+ long migrationDuration = System.nanoTime() - beforeMigration;
+ Object[] line = {System.currentTimeMillis(), "Migration duration", migrationDuration};
+ evalLogger.addLine(line);
+ if (migrationMessage.isSuccess()) {
+ System.out.println(migrationMessage.getTitle());
+ }
+ }
+ //Reconfiguration
+ if (shouldBeReconfigured) {
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+
+ if (sp.getInternalName().equals("i-am-reconfigurable")) {
+ sp.setValue("999");
+ }
+ }));
+ PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+ if (message.isSuccess()) {
+ System.out.println(message.getTitle());
+ }
+ }
+
+ try {
+ Thread.sleep(timeToSleepAfterManipulation);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ //Stop Pipeline
+ if(stopPipeline && !pipeline.isRunning()){
+ PipelineOperationStatus stopMessage = client.pipelines().stop(pipeline);
+ if(stopMessage.isSuccess()) {
+ System.out.println("Pipeline successfully stopped");
+ }
+ }
+ }
+}
diff --git a/streampipes-performance-tests/src/main/java/Test/Test.java b/streampipes-performance-tests/src/main/java/Test/Test.java
new file mode 100644
index 0000000..706aeed
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/Test/Test.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package Test;
+
+public interface Test {
+
+ public void execute();
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
new file mode 100644
index 0000000..410889f
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import Test.GenericTest;
+import Test.Test;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+
+
+public class EdgeExtensionsGenericPerformanceTest {
+
+ public static void main(String ... args){
+
+ int numberOfRuns = 5;
+
+ Test test = TestFactory.getFromEnv();
+
+ for(int i = 1; i <= numberOfRuns; i++){
+ test.execute();
+ }
+ EvaluationLogger.getInstance().writeOut();
+
+ }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
new file mode 100644
index 0000000..c2f875a
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import Test.GenericTest;
+import Test.Test;
+
+public class TestFactory {
+
+
+ public static Test getFromEnv(){
+ switch (System.getenv("TestType")){
+ case "Deployment":
+ return getDeploymentTest();
+ case "Latency":
+ return getLatencyTest();
+ case "Migration":
+ return getMigrationTest();
+ case "Reconfiguration":
+ return getReconfigurationTest();
+ default:
+ throw new RuntimeException("No test configuration found.");
+ }
+ }
+
+
+ public static Test getDeploymentTest(){
+ return new GenericTest(getPipelineName(), true, false,
+ false, 0, 1000);
+ }
+
+ public static Test getLatencyTest(){
+ return new GenericTest(getPipelineName(), true, false,
+ false, 0, 30000);
+ }
+
+ public static Test getMigrationTest(){
+ return new GenericTest(getPipelineName(), true, true,
+ false, 15000, 10000);
+ }
+
+ public static Test getReconfigurationTest(){
+ return new GenericTest(getPipelineName(), true, false,
+ true, 15000, 10000);
+ }
+
+ //Helpers
+ private static String getPipelineName(){
+ String pipelineName = System.getenv("PipelineName");
+ if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+ return pipelineName;
+ }
+
+}