You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/05/21 12:15:46 UTC

[incubator-streampipes] 02/02: add migration, reconfiguration option to StreamPipes Client for performance tests

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit e76b976792d0f74931e015f97cbdbab0eab54913
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri May 21 14:15:23 2021 +0200

    add migration, reconfiguration option to StreamPipes Client for performance tests
---
 .gitignore                                         |   3 +
 docker-save.sh                                     | 100 +++++++++++++++++++++
 .../streampipes/client/api/AbstractClientApi.java  |   6 ++
 .../apache/streampipes/client/api/PipelineApi.java |  18 ++++
 .../streampipes/client/http/HttpRequest.java       |   6 ++
 .../apache/streampipes/client/http/PutRequest.java |  64 +++++++++++++
 .../client/http/PutRequestWithPayloadResponse.java |  52 +++++++++++
 streampipes-performance-tests/pom.xml              |   6 ++
 .../EdgeExtensionsPerformanceTests.java            |  96 ++++++++++++++++++++
 9 files changed, 351 insertions(+)

diff --git a/.gitignore b/.gitignore
index 3fec30b..5f629f2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,9 @@
 # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
 # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
 
+# docker images
+docker-images/
+
 # User-specific stuff:
 .idea/workspace.xml
 .idea/tasks.xml
diff --git a/docker-save.sh b/docker-save.sh
new file mode 100755
index 0000000..45dc594
--- /dev/null
+++ b/docker-save.sh
@@ -0,0 +1,100 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/usr/bin/env bash
+
+repo=apachestreampipes
+version=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
+
+dir=docker-images
+docker_bundled_edge_tar=bundled-edge-img.tar
+docker_bundled_edge_arm_tar=bundled-edge-img-armv7.tar
+docker_bundled_edge_aarch64_tar=bundled-edge-img-aarch64.tar
+docker_bundled_core_tar=bundled-core-img.tar
+
+docker_img_edge=(
+$repo/node-controller:$version \
+$repo/extensions-all-jvm:$version \
+eclipse-mosquitto:1.6.12 )
+
+
+docker_img_edge_arm=(
+$repo/node-controller:$version-armv7 \
+$repo/extensions-all-jvm:$version-armv7 \
+eclipse-mosquitto:1.6.12 )
+
+docker_img_edge_aarch64=(
+$repo/node-controller:$version-aarch64 \
+$repo/extensions-all-jvm:$version-aarch64 \
+eclipse-mosquitto:1.6.12 )
+
+docker_img_core=(
+$repo/ui:$version \
+$repo/backend:$version \
+$repo/pipeline-elements-all-jvm:$version \
+fogsyio/activemq:5.15.9 \
+fogsyio/consul:1.7.1 \
+fogsyio/couchdb:2.3.1 \
+fogsyio/kafka:2.2.0 \
+fogsyio/zookeeper:3.4.13 \
+fogsyio/influxdb:1.7 )
+
+docker_save_bundle(){
+  echo "Start saving Docker images to tar ..."
+  create_dir_if_not_exists
+  if [ "$1" == "edge" ]; then
+      if [ -z "$2" ]; then
+        echo "Save edge images (amd) to tar ..."
+        docker save ${docker_img_edge[@]} -o $dir/$docker_bundled_edge_tar
+      elif [ "$2" == "armv7" ]; then
+        echo "Save edge images (armv7) to tar ..."
+        docker save ${docker_img_edge_arm[@]} -o $dir/docker_bundled_edge_arm_tar
+      elif [ "$2" == "aarch64" ]; then
+        echo "Save edge images (aarch64) to tar ..."
+        docker save ${docker_img_edge_aarch64[@]} -o $dir/docker_bundled_edge_aarch64_tar
+      fi
+  elif [ "$1" == "core" ]; then
+      echo "Save core images to tar ..."
+      docker save ${docker_img_core[@]} -o $dir/$docker_bundled_core_tar
+  else
+      echo "Save all images to tar ..."
+      docker save ${docker_img_edge[@]} -o $dir/$docker_bundled_edge_tar
+      docker save ${docker_img_core[@]} -o $dir/$docker_bundled_core_tar
+  fi
+}
+
+create_dir_if_not_exists(){
+  if [[ ! -e $dir ]]; then
+      mkdir $dir
+  elif [[ ! -d $dir ]]; then
+      echo "$dir already exists but is not a directory" 1>&2
+  fi
+}
+
+usage() {
+  cat <<EOF
+Usage: ./bundle-docker.sh core
+       ./bundle-docker.sh edge
+       ./bundle-docker.sh edge arm
+       ./bundle-docker.sh edge aarch64
+EOF
+}
+
+if [ -z "$1" ]; then
+  usage
+else
+  docker_save_bundle $1 $2
+fi
\ No newline at end of file
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 563bbca..e9fd627 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -68,6 +68,12 @@ public abstract class AbstractClientApi<T> {
     new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
   }
 
+  protected <O> O put(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
+    ObjectSerializer<T, O> serializer = new ObjectSerializer<>();
+    return new PutRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, object, responseClass)
+            .executeRequest();
+  }
+
   protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
     Serializer<Void, O, O> serializer = new ObjectSerializer<>();
     return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 45a0959..f6d5dba 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -100,6 +100,24 @@ public class PipelineApi extends AbstractClientApi<Pipeline> implements CRUDApi<
     return getSingle(getBaseResourcePath().addToPath(pipelineId).addToPath("stop"), PipelineOperationStatus.class);
   }
 
+  public PipelineOperationStatus migrate(Pipeline pipeline) {
+    return migrate(pipeline.getPipelineId(), pipeline);
+  }
+
+  private PipelineOperationStatus migrate(String pipelineId, Pipeline pipeline) {
+    return  post(getBaseResourcePath().addToPath("migrate").addToPath(pipelineId),
+            pipeline, PipelineOperationStatus.class);
+  }
+
+  public PipelineOperationStatus reconfigure(Pipeline pipeline) {
+    return reconfigure(pipeline.getPipelineId(), pipeline);
+  }
+
+  private PipelineOperationStatus reconfigure(String pipelineId, Pipeline pipeline) {
+    return put(getBaseResourcePath().addToPath("reconfigure").addToPath(pipelineId),
+            pipeline, PipelineOperationStatus.class);
+  }
+
   @Override
   protected StreamPipesApiPath getBaseResourcePath() {
     return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
index 096d30e..485d751 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -62,6 +62,12 @@ public abstract class HttpRequest<SO, DSO, DT> {
     return headers.toArray(new Header[0]);
   }
 
+  protected Header[] standardPutHeaders() {
+    List<Header> headers = new ArrayList<>(Arrays.asList(standardHeaders()));
+    headers.add(Headers.contentTypeJson());
+    return headers.toArray(new Header[0]);
+  }
+
   protected String makeUrl() {
     return makeUrl(true);
   }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
new file mode 100644
index 0000000..2fda50d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+public abstract class PutRequest<SO, DSO, DT> extends HttpRequest<SO, DSO, DT> {
+
+  private SO body;
+  private boolean withBody;
+
+  public PutRequest(StreamPipesClientConfig clientConfig,
+                    StreamPipesApiPath apiPath,
+                    Serializer<SO, DSO, DT> serializer,
+                    SO body) {
+    super(clientConfig, apiPath, serializer);
+    this.body = body;
+    this.withBody = true;
+  }
+
+  public PutRequest(StreamPipesClientConfig clientConfig,
+                    StreamPipesApiPath apiPath,
+                    Serializer<SO, DSO, DT> serializer) {
+    super(clientConfig, apiPath, serializer);
+    this.withBody = false;
+  }
+
+  @Override
+  protected Request makeRequest(Serializer<SO, DSO, DT> serializer) {
+    Request request = Request
+            .Put(makeUrl())
+            .setHeaders(standardPutHeaders());
+
+    if (withBody) {
+      addBody(request, serializer);
+    }
+
+    return request;
+  }
+
+  protected void addBody(Request request, Serializer<SO, DSO, DT> serializer) {
+    request.bodyString(serializer.serialize(body), ContentType.APPLICATION_JSON);
+  }
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java
new file mode 100644
index 0000000..8cfef2a
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+import java.io.IOException;
+
+public class PutRequestWithPayloadResponse<SO, DSO, DT> extends PutRequest<SO, DSO, DT> {
+
+  private Class<DSO> responseClass;
+
+  public PutRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+                                       StreamPipesApiPath apiPath,
+                                       Serializer<SO, DSO, DT> serializer,
+                                       SO body,
+                                       Class<DSO> responseClass) {
+    super(clientConfig, apiPath, serializer, body);
+    this.responseClass = responseClass;
+  }
+
+  public PutRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+                                       StreamPipesApiPath apiPath,
+                                       Serializer<SO, DSO, DT> serializer,
+                                       Class<DSO> responseClass) {
+    super(clientConfig, apiPath, serializer);
+    this.responseClass = responseClass;
+  }
+
+  @Override
+  protected DT afterRequest(Serializer<SO, DSO, DT> serializer, HttpEntity entity) throws IOException {
+    return serializer.deserialize(entityAsString(entity), responseClass);
+  }
+}
diff --git a/streampipes-performance-tests/pom.xml b/streampipes-performance-tests/pom.xml
index 24d1c88..2f5887f 100644
--- a/streampipes-performance-tests/pom.xml
+++ b/streampipes-performance-tests/pom.xml
@@ -49,6 +49,12 @@
             <artifactId>streampipes-dataformat-json</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-client</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
 
         <!-- External dependencies -->
     </dependencies>
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java
new file mode 100644
index 0000000..c4723ad
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+
+import java.util.List;
+
+public class EdgeExtensionsPerformanceTests {
+
+    public static void main (String ... args) {
+        StreamPipesCredentials credentials = StreamPipesCredentials
+                .from(System.getenv("user"), System.getenv("apiKey"));
+
+        // Create an instance of the StreamPipes client
+        StreamPipesClient client = StreamPipesClient
+                .create("localhost", 8082, credentials, true);
+
+        boolean shouldBeStopped = true;
+        boolean shouldBeMigrated = false;
+        boolean shouldBeReconfigured = false;
+
+        // Get all pipelines
+        List<Pipeline> pipelines = client.pipelines().all();
+
+        Pipeline pipeline = pipelines.stream()
+                .filter(p -> p.getName().equals("reconfigure"))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+
+        if (!pipeline.isRunning()) {
+            // Start pipeline
+            PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
+            if (message.isSuccess()) {
+                System.out.println(message.getTitle());
+            }
+        } else {
+            // Stop pipeline
+
+            if (shouldBeStopped) {
+                PipelineOperationStatus message = client.pipelines().stop(pipelines.get(0));
+                if(message.isSuccess()) {
+                    System.out.println("Pipeline successfully stopped");
+                }
+            } else if (shouldBeMigrated) {
+                // Migrate pipeline
+                pipeline.getSepas().forEach(p -> {
+                    p.setDeploymentTargetNodeId("edge-02.node-controller");
+                    p.setDeploymentTargetNodeHostname("edge02.example.de");
+                    p.setDeploymentTargetNodePort(7078);
+                });
+
+                PipelineOperationStatus message = client.pipelines().migrate(pipeline);
+                if (message.isSuccess()) {
+                    System.out.println(message.getTitle());
+                }
+            } else if (shouldBeReconfigured) {
+                // Reconfigure pipeline
+                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                        .filter(FreeTextStaticProperty.class::isInstance)
+                        .map(FreeTextStaticProperty.class::cast)
+                        .filter(FreeTextStaticProperty::isReconfigurable)
+                        .forEach(sp -> {
+
+                            if (sp.getInternalName().equals("i-am-reconfigurable")) {
+                                sp.setValue("999");
+                            }
+                        }));
+
+                PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+                if (message.isSuccess()) {
+                    System.out.println(message.getTitle());
+                }
+            }
+        }
+    }
+}