You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/10/13 15:56:39 UTC
[incubator-streampipes] branch edge-extensions updated: [hotfix]
support series connection of identical pipeline elements and call node
controller for configuring connect adapters
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new e715c66 [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
e715c66 is described below
commit e715c6637c2e3d4dcecd6af9801b1c7f60b7ada5
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Oct 13 17:56:31 2021 +0200
[hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
---
.../streampipes/connect/adapter/Adapter.java | 9 +--
.../connect/adapter/GroundingService.java | 26 ++------
.../connect/adapter/NodeControllerService.java | 74 ++++++++++++++++++++++
.../docker/AbstractStreamPipesDockerContainer.java | 2 +
.../components/pipeline/pipeline.component.ts | 7 +-
ui/src/app/editor/services/jsplumb.service.ts | 24 +++++--
6 files changed, 106 insertions(+), 36 deletions(-)
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 4768e02..e14e3e9 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -168,7 +168,7 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
if (adapterManagedByNodeController(adapterDescription)) {
- if (isEdgeOrFogNodeTarget(adapterDescription)) {
+ if (NodeControllerService.isEdgeOrFogNode(adapterDescription)) {
return createSendToEdgeOrFogBrokerAdapterSink(adapterDescription);
} else {
return createSendToCloudBrokerAdapterSink(adapterDescription);
@@ -253,11 +253,4 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
}
- private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
- return NodeManagement.getInstance().getAllNodes().stream()
- .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
- .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
- n.getStaticNodeMetadata().getType().equals("fog"));
- }
-
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
index 3194326..e864d1f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
@@ -93,7 +93,7 @@ public class GroundingService {
if (adapterManagedByNodeController(adapterDescription)) {
- if (isEdgeOrFogNodeTarget(adapterDescription)) {
+ if (NodeControllerService.isEdgeOrFogNode(adapterDescription)) {
createEdgeOrFogGrounding(eventGrounding, topicDefinition, adapterDescription);
} else {
createCloudProtocol(eventGrounding, topicDefinition);
@@ -111,7 +111,7 @@ public class GroundingService {
private static void createEdgeOrFogGrounding(EventGrounding eventGrounding, TopicDefinition topicDefinition,
AdapterDescription adapterDescription) throws AdapterException {
- String nodeControllerId = extractNodeControllerId(adapterDescription);
+
SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
.getMessagingSettings()
.getPrioritizedEdgeProtocols()
@@ -119,14 +119,14 @@ public class GroundingService {
if (isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
MqttTransportProtocol brokerTransportProtocol =
- (MqttTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+ (MqttTransportProtocol) NodeControllerService.getNodeTransportProtocol(adapterDescription);
brokerTransportProtocol.setTopicDefinition(topicDefinition);
eventGrounding.setTransportProtocol(brokerTransportProtocol);
} else if (isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
KafkaTransportProtocol brokerTransportProtocol =
- (KafkaTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+ (KafkaTransportProtocol) NodeControllerService.getNodeTransportProtocol(adapterDescription);
brokerTransportProtocol.setTopicDefinition(topicDefinition);
eventGrounding.setTransportProtocol(brokerTransportProtocol);
@@ -213,26 +213,8 @@ public class GroundingService {
protocol.setTopicDefinition(topicDefinition);
}
- private static TransportProtocol getNodeBrokerTransportProtocol(String id) {
- Optional<NodeInfoDescription> nodeInfoDescription = getNodeInfoDescriptionForId(id);
- if (nodeInfoDescription.isPresent()) {
- return nodeInfoDescription.get().getNodeBroker().getNodeTransportProtocol();
- }
- throw new SpRuntimeException("Could not find node description for id: " + id);
- }
-
- private static Optional<NodeInfoDescription> getNodeInfoDescriptionForId(String id){
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(id);
- }
-
private static boolean adapterManagedByNodeController(AdapterDescription desc) {
return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
}
- private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
- return NodeManagement.getInstance().getAllNodes().stream()
- .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
- .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
- n.getStaticNodeMetadata().getType().equals("fog"));
- }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java
new file mode 100644
index 0000000..644349a
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.adapter;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class NodeControllerService {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeControllerService.class);
+
+ public static boolean isEdgeOrFogNode(AdapterDescription desc) {
+ String baseUrl = generateBaseUrl(desc);
+ NodeInfoDescription node = getNodeDescription(baseUrl);
+ String nodeType = node.getStaticNodeMetadata().getType();
+ LOG.info("Present node type: " + nodeType);
+ return nodeType.equals("edge") || nodeType.equals("fog");
+ }
+
+ public static TransportProtocol getNodeTransportProtocol(AdapterDescription desc) {
+ String baseUrl = generateBaseUrl(desc);
+ NodeInfoDescription node = getNodeDescription(baseUrl);
+ TransportProtocol transportProtocol = node.getNodeBroker().getNodeTransportProtocol();
+ LOG.info("Present node transport protocol: " + transportProtocol.toString());
+ return transportProtocol;
+ }
+
+ public static NodeInfoDescription getNodeDescription(String baseUrl) {
+ String url = baseUrl + "/api/v2/node/info";
+
+ try {
+ String payload = Request.Get(url)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute()
+ .returnContent()
+ .toString();
+
+ return JacksonSerializer
+ .getObjectMapper()
+ .readValue(payload, NodeInfoDescription.class);
+
+ } catch (IOException e) {
+ LOG.info("Could not connect to " + url);
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ private static String generateBaseUrl(AdapterDescription desc) {
+ return "http://" + desc.getDeploymentTargetNodeHostname() + ":" + desc.getDeploymentTargetNodePort();
+ }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
index c4f63eb..ef79477 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
@@ -38,6 +38,8 @@ public abstract class AbstractStreamPipesDockerContainer {
public List<ContainerEnvVar> generateStreamPipesNodeEnvs() {
return new ArrayList<>(Arrays.asList(
+ toEnv(EnvConfigParam.NODE_TYPE.getEnvironmentKey(),
+ NodeConfiguration.getNodeType()),
toEnv(EnvConfigParam.NODE_CONTROLLER_ID.getEnvironmentKey(),
NodeConfiguration.getNodeControllerId()),
toEnv(EnvConfigParam.NODE_CONTROLLER_CONTAINER_HOST.getEnvironmentKey(),
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 22ad8c0..d60e5bd 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -206,7 +206,12 @@ export class PipelineComponent implements OnInit, OnDestroy {
let pipelineElement: PipelineElementUnion = this.findPipelineElementByElementId(pipelineElementId);
if (ui.draggable.hasClass('draggable-icon')) {
this.EditorService.makePipelineAssemblyEmpty(false);
- var pipelineElementConfig = this.JsplumbService.createNewPipelineElementConfig(pipelineElement, this.PipelineEditorService.getCoordinates(ui, this.currentZoomLevel), false, false);
+ let newElementId = pipelineElement.elementId + ":" + this.JsplumbService.makeId(5);
+ var pipelineElementConfig = this.JsplumbService.createNewPipelineElementConfig(pipelineElement,
+ this.PipelineEditorService.getCoordinates(ui, this.currentZoomLevel),
+ false,
+ false,
+ newElementId);
if ((this.isStreamInPipeline() && pipelineElementConfig.type == 'set') ||
this.isSetInPipeline() && pipelineElementConfig.type == 'stream') {
this.showMixedStreamAlert();
diff --git a/ui/src/app/editor/services/jsplumb.service.ts b/ui/src/app/editor/services/jsplumb.service.ts
index 8bdcf2e..5114b18 100644
--- a/ui/src/app/editor/services/jsplumb.service.ts
+++ b/ui/src/app/editor/services/jsplumb.service.ts
@@ -137,13 +137,14 @@ export class JsplumbService {
createNewPipelineElementConfig(pipelineElement: PipelineElementUnion,
coordinates,
isPreview: boolean,
- isCompleted: boolean): PipelineElementConfig {
+ isCompleted: boolean,
+ newElementId?: string): PipelineElementConfig {
let displaySettings = isPreview ? 'connectable-preview' : 'connectable-editor';
let connectable = "connectable";
let pipelineElementConfig = {} as PipelineElementConfig;
pipelineElementConfig.type = PipelineElementTypeUtils
.toCssShortHand(PipelineElementTypeUtils.fromType(pipelineElement))
- pipelineElementConfig.payload = this.clone(pipelineElement);
+ pipelineElementConfig.payload = this.clone(pipelineElement, newElementId);
pipelineElementConfig.settings = {connectable: connectable,
openCustomize: !(pipelineElement as any).configured,
preview: isPreview,
@@ -163,18 +164,31 @@ export class JsplumbService {
return pipelineElementConfig;
}
- clone(pipelineElement: PipelineElementUnion) {
+ clone(pipelineElement: PipelineElementUnion, newElementId?: string) {
if (pipelineElement instanceof SpDataSet) {
return SpDataSet.fromData(pipelineElement, new SpDataSet());
} else if (pipelineElement instanceof SpDataStream) {
return SpDataStream.fromData(pipelineElement, new SpDataStream());
} else if (pipelineElement instanceof DataProcessorInvocation) {
- return DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
+ let clonedPe = DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
+ if (newElementId) {
+ this.updateElementIds(clonedPe, newElementId)
+ }
+ return clonedPe;
} else {
- return DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
+ let clonedPe = DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
+ if (newElementId) {
+ this.updateElementIds(clonedPe, newElementId);
+ }
+ return clonedPe;
}
}
+ updateElementIds(pipelineElement: PipelineElementUnion, newElementId: string) {
+ pipelineElement.elementId = newElementId;
+ pipelineElement.uri = newElementId;
+ }
+
makeId(count: number) {
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";