You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/20 09:03:05 UTC
[incubator-streampipes] 03/03: initial changes to
InvocableGraphBuilder
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 3588b8a1849f10e8e0a1497fd18fb12bb9067a31
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Nov 20 10:01:24 2020 +0100
initial changes to InvocableGraphBuilder
---
.../master/management/WorkerRestClient.java | 2 +-
.../streampipes/container/util/ConsulUtil.java | 10 +-
streampipes-node-controller-container/pom.xml | 3 +-
.../{init => }/NodeControllerContainer.java | 5 +-
.../controller/container/config/ConfigKeys.java | 1 -
.../container/management/info/NodeInfoStorage.java | 2 +-
.../management/resource/ResourceManager.java | 52 +++----
.../AbstractNodeContainerResource.java} | 49 ++++---
.../container/rest/DebugRelayResource.java | 55 +++++++
.../HealthCheckResource.java} | 11 +-
.../InfoStatusResource.java} | 14 +-
.../NodeControllerResourceConfig.java} | 18 +--
.../PELifeCycleResource.java} | 81 +++--------
.../http/InvocableEntityUrlGenerator.java | 4 +-
.../manager/matching/InvocationGraphBuilder.java | 161 +++++++++++++++------
.../manager/matching/ProtocolSelector.java | 44 +++---
.../apache/streampipes/vocabulary/StreamPipes.java | 1 +
.../save-pipeline/save-pipeline.component.html | 13 +-
.../save-pipeline/save-pipeline.component.scss | 4 +
ui/src/app/editor/editor.module.ts | 4 +-
20 files changed, 323 insertions(+), 211 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 747fdbb..3d1a4cc 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -263,7 +263,7 @@ public class WorkerRestClient {
private static byte[] getIconAsset(String baseUrl, String appId) throws AdapterException {
String url = baseUrl + "/" + appId + "/assets/icon";
- logger.info("Trying to get icon from endpoint: " + url);
+ logger.debug("Trying to get icon from endpoint: " + url);
try {
byte[] responseString = Request.Get(url)
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index ecfc10e..d90df3d 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -170,8 +170,8 @@ public class ConsulUtil {
}
- public static int getElementEndpointPort(String route) {
- String value = ConsulUtil.getKeyValue(route)
+ public static int getIntValue(String route) {
+ String value = getKeyValue(route)
.values()
.stream()
.findFirst()
@@ -188,8 +188,8 @@ public class ConsulUtil {
return Integer.parseInt(new Gson().fromJson(value, ConfigItem.class).getValue());
}
- public static String getElementEndpointHostname(String route) {
- String value = ConsulUtil.getKeyValue(route)
+ public static String getStringValue(String route) {
+ String value = getKeyValue(route)
.values()
.stream()
.findFirst()
@@ -241,7 +241,7 @@ public class ConsulUtil {
for (ServiceHealth node : nodes) {
if (node.getService().getTags().containsAll(filterByTags)) {
String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
- LOG.info("Active" + serviceGroup + " endpoint:" + endpoint);
+ LOG.info("Active " + serviceGroup + " endpoint: " + endpoint);
endpoints.add(endpoint);
}
}
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index ca770a5..9f274d6 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -100,7 +100,7 @@
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
- <version>4.1.1</version>
+ <version>5.3.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
@@ -116,7 +116,6 @@
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging-kafka</artifactId>
<version>0.68.0-SNAPSHOT</version>
- <scope>compile</scope>
</dependency>
</dependencies>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
similarity index 93%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
index 96de524..8210dc4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.init;
+package org.apache.streampipes.node.controller.container;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -18,6 +18,7 @@ package org.apache.streampipes.node.controller.container.init;
*/
import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
import org.apache.streampipes.node.controller.container.management.node.NodeJanitorManager;
@@ -34,7 +35,7 @@ import java.util.Collections;
@Configuration
@EnableAutoConfiguration
-@Import({ NodeControllerContainerResourceConfig.class })
+@Import({ NodeControllerResourceConfig.class })
public class NodeControllerContainer {
private static final Logger LOG =
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
index 1335ee6..e3ea78e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
@@ -31,6 +31,5 @@ public class ConfigKeys {
final static String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
final static String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
final static String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
-
final static String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 63a985c..246dd37 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -210,7 +210,7 @@ public class NodeInfoStorage {
}
private static Long getDiskUsage(FileSystem fs) {
- OSFileStore[] fsArray = fs.getFileStores();
+ List<OSFileStore> fsArray = fs.getFileStores();
long diskTotal = 0L;
for(OSFileStore f : fsArray) {
// has SATA disk
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
index aa7a835..e9b5c31 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
@@ -65,7 +65,7 @@ public class ResourceManager {
new Thread(getCurrentResources, "rm").start();
}
- private Runnable getCurrentResources = () -> {
+ private final Runnable getCurrentResources = () -> {
while(true) {
try {
@@ -94,12 +94,12 @@ public class ResourceManager {
nodeResources.put("cpuLoadInPercent", cpuLoad);
nodeResources.put("cpuTemperature", String.format("%.2f°C", cpuTemperature));
nodeResources.put("cpuTemperatureCelcius", cpuTemperature);
- nodeResources.put("freeMemory", freeMemory);
- nodeResources.put("usedMemory", usedMemory);
- nodeResources.put("totalMemory", totalMemory);
+ nodeResources.put("freeMemoryInBytes", freeMemory);
+ nodeResources.put("usedMemoryInBytes", usedMemory);
+ nodeResources.put("totalMemoryInBytes", totalMemory);
for (Map.Entry<String, Map<String, Long>> k : diskUsage.entrySet()) {
- nodeResources.put("availableDisk", k.getValue().get("available"));
+ nodeResources.put("freeDiskSpaceInBytes", k.getValue().get("usableDiskSpace"));
}
} catch (InterruptedException e) {
@@ -118,67 +118,67 @@ public class ResourceManager {
}
private Map<String, Map<String,Long>> getDiskUsage(FileSystem fs) {
- OSFileStore[] fsArray = fs.getFileStores();
+ List<OSFileStore> fsArray = fs.getFileStores();
Map<String, Map<String, Long>> m = new HashMap<>();
for(OSFileStore f : fsArray) {
Map<String, Long> i = new HashMap<>();
// has SATA disk
if (f.getVolume().contains("/dev/sda")){
- i.put("available", f.getUsableSpace());
- i.put("total", f.getTotalSpace());
+ i.put("usableDiskSpace", f.getUsableSpace());
+ i.put("totalDiskSpace", f.getTotalSpace());
m.put(f.getVolume(), i);
}
else if (f.getVolume().contains("/dev/nvme")){
- i.put("available", f.getUsableSpace());
- i.put("total", f.getTotalSpace());
+ i.put("usableDiskSpace", f.getUsableSpace());
+ i.put("totalDiskSpace", f.getTotalSpace());
m.put(f.getVolume(), i);
}
else if (f.getVolume().contains("/dev/disk")){
- i.put("available", f.getUsableSpace());
- i.put("total", f.getTotalSpace());
+ i.put("usableDiskSpace", f.getUsableSpace());
+ i.put("totalDiskSpace", f.getTotalSpace());
m.put(f.getVolume(), i);
}
// Docker in RPi
else if (f.getVolume().contains("/dev/root")){
- i.put("available", f.getUsableSpace());
- i.put("total", f.getTotalSpace());
+ i.put("usableDiskSpace", f.getUsableSpace());
+ i.put("totalDiskSpace", f.getTotalSpace());
m.put(f.getVolume(), i);
}
// Docker in Jetson Nano
else if (f.getVolume().contains("/dev/mmcblk0p1")){
- i.put("available", f.getUsableSpace());
- i.put("total", f.getTotalSpace());
+ i.put("usableDiskSpace", f.getUsableSpace());
+ i.put("totalDiskSpace", f.getTotalSpace());
m.put(f.getVolume(), i);
}
// // has SATA disk
// if (f.getVolume().contains("/dev/sda") && ( f.getMount().equals("/") || f.getMount().equals("/home"))){
-// i.put("available", f.getUsableSpace());
-// i.put("total", f.getTotalSpace());
+// i.put("usableDiskSpace", f.getUsableSpace());
+// i.put("totalDiskSpace", f.getTotalSpace());
// m.put(f.getVolume(), i);
// }
// // has overlay disk (container setup)
// else if (f.getVolume().equals("overlay") && ( f.getMount().equals("/") || f.getMount().equals("/home"))){
-// i.put("available", f.getUsableSpace());
-// i.put("total", f.getTotalSpace());
+// i.put("usableDiskSpace", f.getUsableSpace());
+// i.put("totalDiskSpace", f.getTotalSpace());
// m.put(f.getVolume(), i);
// }
// // has NVME disk
// else if(f.getVolume().contains("/dev/nvme") && ( f.getMount().equals("/") || f.getMount().equals("/home"))) {
-// i.put("available", f.getUsableSpace());
-// i.put("total", f.getTotalSpace());
+// i.put("usableDiskSpace", f.getUsableSpace());
+// i.put("totalDiskSpace", f.getTotalSpace());
// m.put(f.getVolume(), i);
// }
// // disk on macOS
// else if (f.getVolume().contains("/dev/disk") && f.getMount().equals("/")) {
-// i.put("available", f.getUsableSpace());
-// i.put("total", f.getTotalSpace());
+// i.put("usableDiskSpace", f.getUsableSpace());
+// i.put("totalDiskSpace", f.getTotalSpace());
// m.put(f.getVolume(), i);
// }
}
if (m.isEmpty()) {
Map<String, Long> i = new HashMap<>();
- i.put("available", 0L);
- i.put("total", 0L);
+ i.put("usableDiskSpace", 0L);
+ i.put("totalDiskSpace", 0L);
m.put("n/a", i);
}
return m;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
similarity index 54%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
index 628ef09..86c4444 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.api;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,35 +15,47 @@ package org.apache.streampipes.node.controller.container.api;
* limitations under the License.
*
*/
-import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.model.message.Message;
+import sun.security.provider.certpath.OCSPResponse;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/node")
-public class NodeInfoStatusResource {
+public abstract class AbstractNodeContainerResource {
- @GET
- @Path("/info")
- @Produces(MediaType.APPLICATION_JSON)
- public Response getInfo() {
+ protected <T> Response ok(T entity) {
return Response
.ok()
- .entity(NodeInfoStorage.getInstance().retrieveNodeInfo())
+ .entity(entity)
.build();
}
- @GET
- @Path("/status")
- @Produces(MediaType.APPLICATION_JSON)
- public Response getStatus() {
+ protected <T> Response ok() {
return Response
.ok()
- .entity(ResourceManager.getInstance().retrieveNodeResources())
+ .build();
+ }
+
+ protected <T> Response error(T entity) {
+ return Response
+ .status(500)
+ .entity(entity)
+ .build();
+ }
+
+ protected Response statusMessage(Message message) {
+ return ok(message);
+ }
+
+ protected Response fail() {
+ return Response.serverError().build();
+ }
+
+ protected <T> Response fail(T entity) {
+ return Response
+ .serverError()
+ .entity(entity)
.build();
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
new file mode 100644
index 0000000..0ff1ca6
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -0,0 +1,55 @@
+package org.apache.streampipes.node.controller.container.rest;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+public class DebugRelayResource extends AbstractNodeContainerResource {
+
+ // TODO: Debug-only.
+ @POST
+ @Path("/relay/start")
+ public Response debugRelayEventStream(String msg) throws SpRuntimeException {
+ // TODO implement
+
+ System.out.println(msg);
+ EventRelayManager eventRelayManager = new EventRelayManager();
+ eventRelayManager.start();
+ RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+
+ return ok();
+ }
+
+ @POST
+ @Path("/relay/stop")
+ public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
+ // TODO implement
+
+ System.out.println(msg);
+ EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
+ assert eventRelayManager != null;
+ eventRelayManager.stop();
+
+ return ok();
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
similarity index 75%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
index 06759b3..e4dddf4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.api;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +15,9 @@ package org.apache.streampipes.node.controller.container.api;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -24,14 +26,11 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/")
-public class NodeResource {
+public class HealthCheckResource extends AbstractNodeContainerResource{
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getHealth() {
- return Response
- .ok()
- .status(Response.Status.OK)
- .build();
+ return ok(String.format("hello from node controller: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
similarity index 77%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 628ef09..40b3812 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.node.controller.container.rest;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -26,25 +26,19 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/node")
-public class NodeInfoStatusResource {
+public class InfoStatusResource extends AbstractNodeContainerResource{
@GET
@Path("/info")
@Produces(MediaType.APPLICATION_JSON)
public Response getInfo() {
- return Response
- .ok()
- .entity(NodeInfoStorage.getInstance().retrieveNodeInfo())
- .build();
+ return ok(NodeInfoStorage.getInstance().retrieveNodeInfo());
}
@GET
@Path("/status")
@Produces(MediaType.APPLICATION_JSON)
public Response getStatus() {
- return Response
- .ok()
- .entity(ResourceManager.getInstance().retrieveNodeResources())
- .build();
+ return ok(ResourceManager.getInstance().retrieveNodeResources());
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
similarity index 61%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 2704c0a..68930be 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.init;
+package org.apache.streampipes.node.controller.container.rest;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,18 +17,18 @@ package org.apache.streampipes.node.controller.container.init;
*
*/
-import org.apache.streampipes.node.controller.container.api.NodeControllerResource;
-import org.apache.streampipes.node.controller.container.api.NodeInfoStatusResource;
-import org.apache.streampipes.node.controller.container.api.NodeResource;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.stereotype.Component;
@Component
-public class NodeControllerContainerResourceConfig extends ResourceConfig {
+public class NodeControllerResourceConfig extends ResourceConfig {
- public NodeControllerContainerResourceConfig() {
- register(NodeResource.class);
- register(NodeInfoStatusResource.class);
- register(NodeControllerResource.class);
+ public NodeControllerResourceConfig() {
+ register(HealthCheckResource.class);
+ register(InfoStatusResource.class);
+ register(PELifeCycleResource.class);
+
+ // TODO remove later - only for local relay tests
+ register(DebugRelayResource.class);
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
similarity index 73%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
index fb77e4b..091331b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.api;/*
+package org.apache.streampipes.node.controller.container.rest;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,8 +17,6 @@ package org.apache.streampipes.node.controller.container.api;/*
*/
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.container.declarer.InvocableDeclarer;
-import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.container.transform.Transformer;
import org.apache.streampipes.container.util.Util;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -29,7 +27,6 @@ import org.apache.streampipes.node.controller.container.management.container.Doc
import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,33 +36,41 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
@Path("/node/container")
-public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
+public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
private static final Logger LOG =
- LoggerFactory.getLogger(NodeControllerResource.class.getCanonicalName());
-
+ LoggerFactory.getLogger(PELifeCycleResource.class.getCanonicalName());
private static final String COLON = ":";
+ /**
+ *
+ * @return a list of currently running Docker Containers
+ */
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getPipelineElementContainer(){
- return Response
- .ok()
- .entity(DockerOrchestratorManager.getInstance().list())
- .build();
+ return ok(DockerOrchestratorManager.getInstance().list());
}
+ /**
+ * Deploys a new Docker Container
+ *
+ * @param container to be deployed
+ * @return deployment status
+ */
@POST
@Path("/deploy")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response deployPipelineElementContainer(PipelineElementDockerContainer container) {
- return Response
- .ok()
- .entity(DockerOrchestratorManager.getInstance().deploy(container))
- .build();
+ return ok(DockerOrchestratorManager.getInstance().deploy(container));
}
+ /**
+ * Register pipeline elements in consul
+ * @param message
+ * @return
+ */
@POST
@Path("/register")
@Consumes(MediaType.APPLICATION_JSON)
@@ -81,10 +86,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
// .build();
// client.execute(request);
- return Response
- .ok()
- .status(Response.Status.OK)
- .build();
+ return ok();
}
@POST
@@ -117,9 +119,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
} catch (IOException e) {
e.printStackTrace();
}
- return Response
- .ok()
- .build();
+ return ok();
}
// TODO move endpoint to /elementId/instances/runningInstanceId
@@ -144,9 +144,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
EventRelayManager relay = RunningRelayInstances.INSTANCE.removeRelay(appId);
relay.stop();
- return Response
- .ok()
- .build();
+ return ok();
}
@DELETE
@@ -154,38 +152,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
- return Response
- .ok(DockerOrchestratorManager.getInstance().remove(container))
- .build();
- }
-
- // TODO: Debug-only.
- @POST
- @Path("/relay/start")
- public Response debugRelayEventStream(String msg) throws SpRuntimeException {
- // TODO implement
-
- System.out.println(msg);
- EventRelayManager eventRelayManager = new EventRelayManager();
- eventRelayManager.start();
- RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
-
- return Response
- .ok()
- .build();
+ return ok(DockerOrchestratorManager.getInstance().remove(container));
}
- @POST
- @Path("/relay/stop")
- public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
- // TODO implement
-
- System.out.println(msg);
- EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
- eventRelayManager.stop();
-
- return Response
- .ok()
- .build();
- }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 2999aca..94bd5cb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -93,8 +93,8 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
+ pipelineElement.getDeploymentTargetNodeId()
+ SLASH;
- String host = ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY);
- int port = ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY);
+ String host = ConsulUtil.getStringValue(route + PE_HOST_KEY);
+ int port = ConsulUtil.getIntValue(route + PE_PORT_KEY);
// Necessary because secondary pipeline element description is not stored in backend
// It uses information from primary pipeline element. Node controller will locally forward
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 3f11c1a..80c9acd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.manager.matching;
import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphHelpers;
import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
@@ -27,7 +28,7 @@ import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.schema.EventSchema;
@@ -41,75 +42,97 @@ import java.util.stream.Collectors;
public class InvocationGraphBuilder {
- private PipelineGraph pipelineGraph;
- private String pipelineId;
+ private final PipelineGraph pipelineGraph;
+ private final String pipelineId;
private Integer uniquePeIndex = 0;
-
- private List<InvocableStreamPipesEntity> graphs;
+ private final List<InvocableStreamPipesEntity> graphs;
public InvocationGraphBuilder(PipelineGraph pipelineGraph, String pipelineId) {
this.graphs = new ArrayList<>();
this.pipelineGraph = pipelineGraph;
this.pipelineId = pipelineId;
-
}
public List<InvocableStreamPipesEntity> buildGraphs() {
- List<SpDataStream> streams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
- for (SpDataStream stream : streams) {
- Set<InvocableStreamPipesEntity> connectedElements = getConnections(stream);
- configure(stream, connectedElements);
- }
+ PipelineGraphHelpers
+ .findStreams(pipelineGraph)
+ .forEach(stream -> configure(stream, getConnections(stream)));
return graphs;
}
private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
- EventGrounding inputGrounding = new GroundingBuilder(source, targets)
- .getEventGrounding();
+ EventGrounding inputGrounding = new GroundingBuilder(source, targets).getEventGrounding();
+ // set output stream event grounding for source data processors
if (source instanceof InvocableStreamPipesEntity) {
if (source instanceof DataProcessorInvocation && ((DataProcessorInvocation) source).isConfigured()) {
- DataProcessorInvocation dataProcessorInvocation = (DataProcessorInvocation) source;
- Tuple2<EventSchema, ? extends OutputStrategy> outputSettings;
- OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
- .getOuputSchemaGenerator();
-
- if (((DataProcessorInvocation) source).getInputStreams().size() == 1) {
- outputSettings = schemaGenerator.buildFromOneStream(dataProcessorInvocation
- .getInputStreams()
- .get(0));
- } else if (graphExists(dataProcessorInvocation.getDOM())) {
- DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
-
- outputSettings = schemaGenerator.buildFromTwoStreams(existingInvocation
- .getInputStreams().get(0), dataProcessorInvocation.getInputStreams().get(1));
- graphs.remove(existingInvocation);
- } else {
- outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation
- .getOutputStrategies().get(0));
- }
+ DataProcessorInvocation sourceInvokation = (DataProcessorInvocation) source;
- SpDataStream outputStream = new SpDataStream();
- outputStream.setEventGrounding(inputGrounding);
- dataProcessorInvocation.setOutputStrategies(Collections.singletonList(outputSettings.b));
- outputStream.setEventSchema(outputSettings.a);
- ((DataProcessorInvocation) source).setOutputStream(outputStream);
+ Tuple2<EventSchema, ? extends OutputStrategy> outputSettings = getOutputSettings(sourceInvokation);
+ sourceInvokation.setOutputStrategies(Collections.singletonList(outputSettings.b));
+ sourceInvokation.setOutputStream(makeOutputStream(inputGrounding, outputSettings));
}
-
if (!graphExists(source.getDOM())) {
graphs.add((InvocableStreamPipesEntity) source);
}
}
+ // set input stream event grounding for target element data processors and sinks
targets.forEach(t -> {
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(inputGrounding);
+ // check if source and target share same node
+ if (source instanceof InvocableStreamPipesEntity) {
+ if (((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId() != null ||
+ t.getDeploymentTargetNodeId() != null) {
+
+ if (matchingDeploymentTarget((InvocableStreamPipesEntity) source, t)) {
+ // shared grounding
+ // TODO: set event relay to false
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+
+ } else {
+ // check if target runs on cloud or edge node
+ if (t.getDeploymentTargetNodeId().equals("default")) {
+
+ // target runs on cloud node: use central cloud broker, e.g. kafka
+ // TODO: set event relay to true
+ // TODO: add cloud broker to List<EventRelays>
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+
+ } else {
+ // target runs on edge node: use target edge node broker
+ // TODO: set event relay to true
+ // TODO: add target edge node broker to List<EventRelays>
+
+ String broker = getEdgeBroker(t);
+
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+ }
+ }
+ } else {
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+ }
+ } else {
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
+ }
+
+ // old
+// t.getInputStreams()
+// .get(getIndex(source.getDOM(), t))
+// .setEventGrounding(inputGrounding);
t.getInputStreams()
.get(getIndex(source.getDOM(), t))
@@ -128,7 +151,56 @@ public class InvocationGraphBuilder {
configure(t, getConnections(t));
});
+ }
+ private Tuple2<EventSchema,? extends OutputStrategy> getOutputSettings(DataProcessorInvocation dataProcessorInvocation) {
+ Tuple2<EventSchema,? extends OutputStrategy> outputSettings;
+ OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
+ .getOuputSchemaGenerator();
+
+ if (dataProcessorInvocation.getInputStreams().size() == 1) {
+ outputSettings = schemaGenerator
+ .buildFromOneStream(dataProcessorInvocation
+ .getInputStreams()
+ .get(0));
+ } else if (graphExists(dataProcessorInvocation.getDOM())) {
+ DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
+ outputSettings = schemaGenerator
+ .buildFromTwoStreams(
+ existingInvocation.getInputStreams().get(0),
+ dataProcessorInvocation.getInputStreams().get(1));
+ graphs.remove(existingInvocation);
+ } else {
+ outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation.getOutputStrategies().get(0));
+ }
+ return outputSettings;
+ }
+
+ private SpDataStream makeOutputStream(EventGrounding inputGrounding,
+ Tuple2<EventSchema,? extends OutputStrategy> outputSettings) {
+ SpDataStream outputStream = new SpDataStream();
+ outputStream.setEventGrounding(inputGrounding);
+ outputStream.setEventSchema(outputSettings.a);
+ return outputStream;
+ }
+
+ private String getEdgeBroker(InvocableStreamPipesEntity target) {
+ return ConsulUtil.getStringValue(
+ "sp/v1/node/org.apache.streampipes.node.controller/"
+ + target.getDeploymentTargetNodeId()
+ + "/config/SP_NODE_BROKER_HOST");
+ }
+
+
+ private boolean matchingDeploymentTarget(InvocableStreamPipesEntity source, InvocableStreamPipesEntity target) {
+ if (source instanceof DataProcessorInvocation && target instanceof DataProcessorInvocation) {
+ if (source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId())) {
+ System.out.println("same node - no relay");
+ return true;
+ }
+ return false;
+ }
+ return false;
}
private ElementStatusInfoSettings makeStatusInfoSettings(String elementIdentifier) {
@@ -165,13 +237,11 @@ public class InvocationGraphBuilder {
}
private Set<InvocableStreamPipesEntity> getConnections(NamedStreamPipesEntity source) {
- Set<String> outgoingEdges = pipelineGraph.outgoingEdgesOf(source);
- return outgoingEdges
+ return pipelineGraph.outgoingEdgesOf(source)
.stream()
.map(o -> pipelineGraph.getEdgeTarget(o))
.map(g -> (InvocableStreamPipesEntity) g)
.collect(Collectors.toSet());
-
}
private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
@@ -191,5 +261,4 @@ public class InvocationGraphBuilder {
.findFirst()
.get();
}
-
}
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index ec3a891..f38212e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -34,14 +34,15 @@ import java.util.Set;
public class ProtocolSelector extends GroundingSelector {
- private String outputTopic;
- private List<SpProtocol> prioritizedProtocols;
+ private final String outputTopic;
+ private final List<SpProtocol> prioritizedProtocols;
public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
super(source, targets);
this.outputTopic = TopicGenerator.generateRandomTopic();
- this.prioritizedProtocols =
- BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
+ this.prioritizedProtocols = BackendConfig.INSTANCE
+ .getMessagingSettings()
+ .getPrioritizedProtocols();
}
public TransportProtocol getPreferredProtocol() {
@@ -50,36 +51,34 @@ public class ProtocolSelector extends GroundingSelector {
.getEventGrounding()
.getTransportProtocol();
} else {
- for(SpProtocol prioritizedProtocol: prioritizedProtocols) {
- if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) &&
- supportsProtocol(KafkaTransportProtocol.class)) {
- return kafkaTopic();
- }
- else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) &&
- supportsProtocol(JmsTransportProtocol.class)) {
- return jmsTopic();
- } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) &&
- supportsProtocol(MqttTransportProtocol.class)) {
- return mqttTopic();
+ for(SpProtocol p: prioritizedProtocols) {
+ if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
+ return kafkaTransportProtocol();
+ } else if (matches(p, JmsTransportProtocol.class) && supportsProtocol(JmsTransportProtocol.class)) {
+ return jmsTransportProtocol();
+ } else if (matches(p, MqttTransportProtocol.class) && supportsProtocol(MqttTransportProtocol.class)) {
+ return mqttTransportProtocol();
+ } else {
+ throw new IllegalArgumentException("Transport protocol not found: " + p.getProtocolClass());
}
}
}
- return kafkaTopic();
+ throw new IllegalArgumentException("Could not get preferred transport protocol");
}
- private TransportProtocol mqttTopic() {
+ private TransportProtocol mqttTransportProtocol() {
return new MqttTransportProtocol(BackendConfig.INSTANCE.getMqttHost(),
BackendConfig.INSTANCE.getMqttPort(),
outputTopic);
}
- private TransportProtocol jmsTopic() {
+ private TransportProtocol jmsTransportProtocol() {
return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
BackendConfig.INSTANCE.getJmsPort(),
outputTopic);
}
- private TransportProtocol kafkaTopic() {
+ private TransportProtocol kafkaTransportProtocol() {
return new KafkaTransportProtocol(BackendConfig.INSTANCE.getKafkaHost(),
BackendConfig.INSTANCE.getKafkaPort(),
outputTopic,
@@ -87,10 +86,12 @@ public class ProtocolSelector extends GroundingSelector {
BackendConfig.INSTANCE.getZookeeperPort());
}
+ private <T extends TransportProtocol> boolean matches(SpProtocol p, Class<T> clazz) {
+ return p.getProtocolClass().equals(clazz.getCanonicalName());
+ }
- public <T extends TransportProtocol> boolean supportsProtocol(Class<T> protocol) {
+ private <T extends TransportProtocol> boolean supportsProtocol(Class<T> protocol) {
List<InvocableStreamPipesEntity> elements = buildInvocables();
-
return elements
.stream()
.allMatch(e -> e
@@ -98,6 +99,5 @@ public class ProtocolSelector extends GroundingSelector {
.getTransportProtocols()
.stream()
.anyMatch(protocol::isInstance));
-
}
}
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 9a8d673..8897ffa 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -418,4 +418,5 @@ public class StreamPipes {
public static final String PE_CONFIGURED = NS + "isPeConfigured" ;
public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes" ;
+ public static final String HAS_EVENT_STREAM_RELAYS = NS + "hasEventStreamRelays";
}
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index f8c2199..fb900b7 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -49,7 +49,18 @@
<mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
Choose deployment options
</mat-slide-toggle>
- <mat-divider style="margin: 2em 0 2em 0;"></mat-divider>
+ <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
+<!-- <div *ngIf="advancedSettings">-->
+<!-- <div fxFlex="100" fxLayout="row">-->
+<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="center center">-->
+<!-- Event strategy-->
+<!-- </div>-->
+<!-- </div>-->
+<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
+<!-- <mat-slider></mat-slider>-->
+<!-- </div>-->
+<!-- </div>-->
+ <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
<div *ngIf="advancedSettings">
<b>Node selection</b>
<div *ngFor="let processors of pipeline.sepas">
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
index c5dce8d..2f8c6bb 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
@@ -43,4 +43,8 @@
.status-subtext {
font-size: 12pt;
+}
+
+mat-slider {
+ width: 300px;
}
\ No newline at end of file
diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts
index db54354..d44fc86 100644
--- a/ui/src/app/editor/editor.module.ts
+++ b/ui/src/app/editor/editor.module.ts
@@ -59,6 +59,7 @@ import {CustomOutputStrategyComponent} from "./components/output-strategy/custom
import {PropertySelectionComponent} from "./components/output-strategy/property-selection/property-selection.component";
import {UserDefinedOutputStrategyComponent} from "./components/output-strategy/user-defined-output/user-defined-output.component";
import {ConnectModule} from "../connect/connect.module";
+import {MatSliderModule} from "@angular/material/slider";
@NgModule({
imports: [
@@ -75,7 +76,8 @@ import {ConnectModule} from "../connect/connect.module";
FormsModule,
MatProgressSpinnerModule,
ShowdownModule,
- ReactiveFormsModule
+ ReactiveFormsModule,
+ MatSliderModule
],
declarations: [
CompatibleElementsComponent,