You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/05/21 12:15:46 UTC
[incubator-streampipes] 02/02: add migration,
reconfiguration option to StreamPipes Client for performance tests
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit e76b976792d0f74931e015f97cbdbab0eab54913
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri May 21 14:15:23 2021 +0200
add migration, reconfiguration option to StreamPipes Client for performance tests
---
.gitignore | 3 +
docker-save.sh | 100 +++++++++++++++++++++
.../streampipes/client/api/AbstractClientApi.java | 6 ++
.../apache/streampipes/client/api/PipelineApi.java | 18 ++++
.../streampipes/client/http/HttpRequest.java | 6 ++
.../apache/streampipes/client/http/PutRequest.java | 64 +++++++++++++
.../client/http/PutRequestWithPayloadResponse.java | 52 +++++++++++
streampipes-performance-tests/pom.xml | 6 ++
.../EdgeExtensionsPerformanceTests.java | 96 ++++++++++++++++++++
9 files changed, 351 insertions(+)
diff --git a/.gitignore b/.gitignore
index 3fec30b..5f629f2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,9 @@
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+# docker images
+docker-images/
+
# User-specific stuff:
.idea/workspace.xml
.idea/tasks.xml
diff --git a/docker-save.sh b/docker-save.sh
new file mode 100755
index 0000000..45dc594
--- /dev/null
+++ b/docker-save.sh
@@ -0,0 +1,100 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/usr/bin/env bash
+
+repo=apachestreampipes
+version=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
+
+dir=docker-images
+docker_bundled_edge_tar=bundled-edge-img.tar
+docker_bundled_edge_arm_tar=bundled-edge-img-armv7.tar
+docker_bundled_edge_aarch64_tar=bundled-edge-img-aarch64.tar
+docker_bundled_core_tar=bundled-core-img.tar
+
+docker_img_edge=(
+$repo/node-controller:$version \
+$repo/extensions-all-jvm:$version \
+eclipse-mosquitto:1.6.12 )
+
+
+docker_img_edge_arm=(
+$repo/node-controller:$version-armv7 \
+$repo/extensions-all-jvm:$version-armv7 \
+eclipse-mosquitto:1.6.12 )
+
+docker_img_edge_aarch64=(
+$repo/node-controller:$version-aarch64 \
+$repo/extensions-all-jvm:$version-aarch64 \
+eclipse-mosquitto:1.6.12 )
+
+docker_img_core=(
+$repo/ui:$version \
+$repo/backend:$version \
+$repo/pipeline-elements-all-jvm:$version \
+fogsyio/activemq:5.15.9 \
+fogsyio/consul:1.7.1 \
+fogsyio/couchdb:2.3.1 \
+fogsyio/kafka:2.2.0 \
+fogsyio/zookeeper:3.4.13 \
+fogsyio/influxdb:1.7 )
+
+docker_save_bundle(){
+ echo "Start saving Docker images to tar ..."
+ create_dir_if_not_exists
+ if [ "$1" == "edge" ]; then
+ if [ -z "$2" ]; then
+ echo "Save edge images (amd) to tar ..."
+ docker save ${docker_img_edge[@]} -o $dir/$docker_bundled_edge_tar
+ elif [ "$2" == "armv7" ]; then
+ echo "Save edge images (armv7) to tar ..."
+ docker save ${docker_img_edge_arm[@]} -o $dir/docker_bundled_edge_arm_tar
+ elif [ "$2" == "aarch64" ]; then
+ echo "Save edge images (aarch64) to tar ..."
+ docker save ${docker_img_edge_aarch64[@]} -o $dir/docker_bundled_edge_aarch64_tar
+ fi
+ elif [ "$1" == "core" ]; then
+ echo "Save core images to tar ..."
+ docker save ${docker_img_core[@]} -o $dir/$docker_bundled_core_tar
+ else
+ echo "Save all images to tar ..."
+ docker save ${docker_img_edge[@]} -o $dir/$docker_bundled_edge_tar
+ docker save ${docker_img_core[@]} -o $dir/$docker_bundled_core_tar
+ fi
+}
+
+create_dir_if_not_exists(){
+ if [[ ! -e $dir ]]; then
+ mkdir $dir
+ elif [[ ! -d $dir ]]; then
+ echo "$dir already exists but is not a directory" 1>&2
+ fi
+}
+
+usage() {
+ cat <<EOF
+Usage: ./bundle-docker.sh core
+ ./bundle-docker.sh edge
+ ./bundle-docker.sh edge arm
+ ./bundle-docker.sh edge aarch64
+EOF
+}
+
+if [ -z "$1" ]; then
+ usage
+else
+ docker_save_bundle $1 $2
+fi
\ No newline at end of file
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 563bbca..e9fd627 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -68,6 +68,12 @@ public abstract class AbstractClientApi<T> {
new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
}
+ protected <O> O put(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
+ ObjectSerializer<T, O> serializer = new ObjectSerializer<>();
+ return new PutRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, object, responseClass)
+ .executeRequest();
+ }
+
protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
Serializer<Void, O, O> serializer = new ObjectSerializer<>();
return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 45a0959..f6d5dba 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -100,6 +100,24 @@ public class PipelineApi extends AbstractClientApi<Pipeline> implements CRUDApi<
return getSingle(getBaseResourcePath().addToPath(pipelineId).addToPath("stop"), PipelineOperationStatus.class);
}
+ public PipelineOperationStatus migrate(Pipeline pipeline) {
+ return migrate(pipeline.getPipelineId(), pipeline);
+ }
+
+ private PipelineOperationStatus migrate(String pipelineId, Pipeline pipeline) {
+ return post(getBaseResourcePath().addToPath("migrate").addToPath(pipelineId),
+ pipeline, PipelineOperationStatus.class);
+ }
+
+ public PipelineOperationStatus reconfigure(Pipeline pipeline) {
+ return reconfigure(pipeline.getPipelineId(), pipeline);
+ }
+
+ private PipelineOperationStatus reconfigure(String pipelineId, Pipeline pipeline) {
+ return put(getBaseResourcePath().addToPath("reconfigure").addToPath(pipelineId),
+ pipeline, PipelineOperationStatus.class);
+ }
+
@Override
protected StreamPipesApiPath getBaseResourcePath() {
return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
index 096d30e..485d751 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -62,6 +62,12 @@ public abstract class HttpRequest<SO, DSO, DT> {
return headers.toArray(new Header[0]);
}
+ protected Header[] standardPutHeaders() {
+ List<Header> headers = new ArrayList<>(Arrays.asList(standardHeaders()));
+ headers.add(Headers.contentTypeJson());
+ return headers.toArray(new Header[0]);
+ }
+
protected String makeUrl() {
return makeUrl(true);
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
new file mode 100644
index 0000000..2fda50d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+public abstract class PutRequest<SO, DSO, DT> extends HttpRequest<SO, DSO, DT> {
+
+ private SO body;
+ private boolean withBody;
+
+ public PutRequest(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Serializer<SO, DSO, DT> serializer,
+ SO body) {
+ super(clientConfig, apiPath, serializer);
+ this.body = body;
+ this.withBody = true;
+ }
+
+ public PutRequest(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Serializer<SO, DSO, DT> serializer) {
+ super(clientConfig, apiPath, serializer);
+ this.withBody = false;
+ }
+
+ @Override
+ protected Request makeRequest(Serializer<SO, DSO, DT> serializer) {
+ Request request = Request
+ .Put(makeUrl())
+ .setHeaders(standardPutHeaders());
+
+ if (withBody) {
+ addBody(request, serializer);
+ }
+
+ return request;
+ }
+
+ protected void addBody(Request request, Serializer<SO, DSO, DT> serializer) {
+ request.bodyString(serializer.serialize(body), ContentType.APPLICATION_JSON);
+ }
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java
new file mode 100644
index 0000000..8cfef2a
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequestWithPayloadResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+import java.io.IOException;
+
+public class PutRequestWithPayloadResponse<SO, DSO, DT> extends PutRequest<SO, DSO, DT> {
+
+ private Class<DSO> responseClass;
+
+ public PutRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Serializer<SO, DSO, DT> serializer,
+ SO body,
+ Class<DSO> responseClass) {
+ super(clientConfig, apiPath, serializer, body);
+ this.responseClass = responseClass;
+ }
+
+ public PutRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Serializer<SO, DSO, DT> serializer,
+ Class<DSO> responseClass) {
+ super(clientConfig, apiPath, serializer);
+ this.responseClass = responseClass;
+ }
+
+ @Override
+ protected DT afterRequest(Serializer<SO, DSO, DT> serializer, HttpEntity entity) throws IOException {
+ return serializer.deserialize(entityAsString(entity), responseClass);
+ }
+}
diff --git a/streampipes-performance-tests/pom.xml b/streampipes-performance-tests/pom.xml
index 24d1c88..2f5887f 100644
--- a/streampipes-performance-tests/pom.xml
+++ b/streampipes-performance-tests/pom.xml
@@ -49,6 +49,12 @@
<artifactId>streampipes-dataformat-json</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-client</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
<!-- External dependencies -->
</dependencies>
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java
new file mode 100644
index 0000000..c4723ad
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsPerformanceTests.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+
+import java.util.List;
+
+public class EdgeExtensionsPerformanceTests {
+
+ public static void main (String ... args) {
+ StreamPipesCredentials credentials = StreamPipesCredentials
+ .from(System.getenv("user"), System.getenv("apiKey"));
+
+ // Create an instance of the StreamPipes client
+ StreamPipesClient client = StreamPipesClient
+ .create("localhost", 8082, credentials, true);
+
+ boolean shouldBeStopped = true;
+ boolean shouldBeMigrated = false;
+ boolean shouldBeReconfigured = false;
+
+ // Get all pipelines
+ List<Pipeline> pipelines = client.pipelines().all();
+
+ Pipeline pipeline = pipelines.stream()
+ .filter(p -> p.getName().equals("reconfigure"))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+
+ if (!pipeline.isRunning()) {
+ // Start pipeline
+ PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
+ if (message.isSuccess()) {
+ System.out.println(message.getTitle());
+ }
+ } else {
+ // Stop pipeline
+
+ if (shouldBeStopped) {
+ PipelineOperationStatus message = client.pipelines().stop(pipelines.get(0));
+ if(message.isSuccess()) {
+ System.out.println("Pipeline successfully stopped");
+ }
+ } else if (shouldBeMigrated) {
+ // Migrate pipeline
+ pipeline.getSepas().forEach(p -> {
+ p.setDeploymentTargetNodeId("edge-02.node-controller");
+ p.setDeploymentTargetNodeHostname("edge02.example.de");
+ p.setDeploymentTargetNodePort(7078);
+ });
+
+ PipelineOperationStatus message = client.pipelines().migrate(pipeline);
+ if (message.isSuccess()) {
+ System.out.println(message.getTitle());
+ }
+ } else if (shouldBeReconfigured) {
+ // Reconfigure pipeline
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+
+ if (sp.getInternalName().equals("i-am-reconfigurable")) {
+ sp.setValue("999");
+ }
+ }));
+
+ PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+ if (message.isSuccess()) {
+ System.out.println(message.getTitle());
+ }
+ }
+ }
+ }
+}