You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/01/12 12:12:41 UTC
[incubator-streampipes] 01/02: [WIP] initial work on resource
requirement defition in SDK and core
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 732915a4d0ae68fc5bd10cb6dcaf5c47ef2c0c84
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sun Jan 10 00:37:27 2021 +0100
[WIP] initial work on resource requirement defition in SDK and core
---
.../api/AbstractPipelineElementResource.java | 1 -
.../model/base/ConsumableStreamPipesEntity.java | 19 ++++
.../model/base/InvocableStreamPipesEntity.java | 17 +++
.../model/graph/DataProcessorInvocation.java | 4 +
.../model/graph/DataSinkInvocation.java | 1 +
.../model/graph/PipelineElementContainer.java | 9 ++
.../model/message/NotificationType.java | 3 +
.../model/node/NodeInfoDescription.java | 6 +-
.../streampipes/model/node/meta/GeoLocation.java | 1 +
.../GeoLocation.java => resource/Hardware.java} | 41 +++----
.../NodeResourceRequirement.java} | 47 +++-----
.../org/apache/streampipes/model/util/Cloner.java | 18 ++-
.../controller/container/NodeControllerInit.java | 36 +++---
.../api/DataProcessorPipelineElementResource.java | 14 +--
.../api/DataSinkPipelineElementResource.java | 14 +--
.../container/api/InfoStatusResource.java | 16 +++
.../container/api/InvocableEntityResource.java | 36 +++---
.../api/NodeControllerResourceConfig.java | 3 +-
.../container/management/node/NodeManager.java | 99 ++++++++++++----
.../management/pe/InvocableElementManager.java | 2 +-
...ifeCycle.java => PipelineElementLifeCycle.java} | 2 +-
.../manager/node/AbstractClusterManager.java | 124 +++++++++++++++++++++
.../manager/node/NodeClusterManager.java | 107 ++++++++++--------
.../streampipes/manager/operations/Operations.java | 42 -------
.../manager/setup/CouchDbInstallationStep.java | 3 +-
.../verification/extractor/TypeExtractor.java | 33 ++++--
.../org/apache/streampipes/rest/api/INode.java | 2 +
.../org/apache/streampipes/rest/impl/Node.java | 49 +++++---
.../impl/nouser/PipelineElementImportNoUser.java | 2 +-
.../builder/AbstractProcessingElementBuilder.java | 19 +++-
.../sdk/builder/ProcessingElementBuilder.java | 1 +
.../sdk/builder/ResourceRequirementsBuilder.java | 30 +++--
.../sdk/helpers/CollectedResourceRequirements.java | 23 ++++
...irements.java => NodeResourceRequirements.java} | 10 +-
.../serializers/json/GsonSerializer.java | 2 +
.../jsonld/CustomAnnotationProvider.java | 4 +-
.../streampipes/storage/api/INodeInfoStorage.java | 4 +
.../storage/couchdb/impl/NodeInfoStorageImpl.java | 28 ++++-
.../apache/streampipes/vocabulary/StreamPipes.java | 5 +
.../node-configuration.component.ts | 8 +-
ui/src/app/core-model/gen/streampipes-model.ts | 56 ++++++++--
.../save-pipeline/save-pipeline.component.ts | 4 +-
.../pipeline-overview.component.html | 2 +-
ui/src/app/platform-services/apis/node.service.ts | 14 +++
44 files changed, 683 insertions(+), 278 deletions(-)
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
index 1797897..ba6f338 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
@@ -148,7 +148,6 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
String originalId = desc.getElementId();
String uri = DeclarersSingleton.getInstance().getBaseUri() + type + desc.getElementId();
desc.setElementId(uri);
- desc.setElementId(uri);
// TODO remove after full internationalization support has been implemented
if (desc.isIncludesLocales()) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
index 6faf938..f7936fb 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.model.base;
import io.fogsy.empire.annotations.RdfProperty;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.vocabulary.StreamPipes;
@@ -46,6 +47,11 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
@RdfProperty(StreamPipes.HAS_STATIC_PROPERTY)
protected List<StaticProperty> staticProperties;
+ @OneToMany(fetch = FetchType.EAGER,
+ cascade = {CascadeType.ALL})
+ @RdfProperty(StreamPipes.HAS_NODE_RESOURCE_PROPERTY)
+ protected List<NodeResourceRequirement> resourceRequirements;
+
@OneToOne(fetch = FetchType.EAGER,
cascade = {CascadeType.ALL})
@RdfProperty(StreamPipes.SUPPORTED_GROUNDING)
@@ -64,12 +70,14 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
super();
this.spDataStreams = new ArrayList<>();
this.staticProperties = new ArrayList<>();
+ this.resourceRequirements = new ArrayList<>();
}
public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
super(uri, name, description, iconUrl);
this.spDataStreams = new ArrayList<>();
this.staticProperties = new ArrayList<>();
+ this.resourceRequirements = new ArrayList<>();
}
public ConsumableStreamPipesEntity(ConsumableStreamPipesEntity other) {
@@ -84,6 +92,9 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
if (other.getSupportedGrounding() != null) {
this.supportedGrounding = new EventGrounding(other.getSupportedGrounding());
}
+ if (other.getResourceRequirements() != null) {
+ this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+ }
}
public List<SpDataStream> getSpDataStreams() {
@@ -137,4 +148,12 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
public void setElementEndpointServiceName(String elementEndpointServiceName) {
this.elementEndpointServiceName = elementEndpointServiceName;
}
+
+ public List<NodeResourceRequirement> getResourceRequirements() {
+ return resourceRequirements;
+ }
+
+ public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+ this.resourceRequirements = resourceRequirements;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 8d38c28..457bfe1 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.vocabulary.StreamPipes;
@@ -72,6 +73,11 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
@RdfProperty(StreamPipes.REQUIRES_STREAM)
private List<SpDataStream> streamRequirements;
+ @OneToMany(fetch = FetchType.EAGER,
+ cascade = {CascadeType.ALL})
+ @RdfProperty(StreamPipes.REQUIRES_RESOURCES)
+ protected List<NodeResourceRequirement> resourceRequirements;
+
@RdfProperty(StreamPipes.ELEMENT_ENDPOINT_HOSTNAME)
private String elementEndpointHostname;
@@ -127,6 +133,9 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
if (other.getSupportedGrounding() != null) {
this.supportedGrounding = new EventGrounding(other.getSupportedGrounding());
}
+ if (other.getResourceRequirements() != null) {
+ this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+ }
}
public InvocableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
@@ -279,4 +288,12 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
public void setElementEndpointServiceName(String elementEndpointServiceName) {
this.elementEndpointServiceName = elementEndpointServiceName;
}
+
+ public List<NodeResourceRequirement> getResourceRequirements() {
+ return resourceRequirements;
+ }
+
+ public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+ this.resourceRequirements = resourceRequirements;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 016f3c7..5c3236b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -86,6 +86,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
this.setOutputStreamRelays(sepa.getOutputStreamRelays());
this.setEventRelayStrategy(sepa.getEventRelayStrategy());
+ this.setResourceRequirements(sepa.getResourceRequirements());
//this.setUri(belongsTo +"/" +getElementId());
}
@@ -102,6 +103,9 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
this.pathName = other.getPathName();
this.eventRelayStrategy = other.getEventRelayStrategy();
this.category = new Cloner().epaTypes(other.getCategory());
+ if (other.getResourceRequirements() != null) {
+ this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+ }
}
public DataProcessorInvocation(DataProcessorDescription sepa, String domId) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index 17449f7..b4ced40 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -65,6 +65,7 @@ public class DataSinkInvocation extends InvocableStreamPipesEntity {
this.setElementEndpointServiceName(sec.getElementEndpointServiceName());
this.setElementEndpointHostname(sec.getElementEndpointHostname());
this.setElementEndpointPort(sec.getElementEndpointPort());
+ this.setResourceRequirements(sec.getResourceRequirements());
//this.setUri(belongsTo +"/" +getElementId());
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
index 47561c8..4f031ea 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
@@ -39,6 +39,15 @@ public class PipelineElementContainer extends NamedStreamPipesEntity {
@RdfProperty(StreamPipes.HAS_PIPELINE_ELEMENTS)
private List<NamedStreamPipesEntity> pipelineElementDescriptions;
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_ID)
+ private String deploymentTargetNodeId;
+
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_HOSTNAME)
+ private String deploymentTargetNodeHostname;
+
+ @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_PORT)
+ private Integer deploymentTargetNodePort;
+
public PipelineElementContainer(PipelineElementContainer other) {
super(other);
if (other.getPipelineElementDescriptions() != null) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index e8720c4..4a1c81e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -60,6 +60,9 @@ public enum NotificationType {
REMOVED_ACTION("Action removed", ""),
REMOVED_SOURCE("Source removed", ""),
REMOVED_SEPA("Sepa removed", ""),
+ REMOVED_NODE("Node removed", ""),
+
+ NODE_STATE_UPDATE_ERROR("Node state update error", "Node state could not be changed."),
ADDED_CONFIGURATION("Configuration added", ""),
INSTALLATION_SUCCESSFUL("Installation successful", ""),
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
index 1d87f86..a41db8c 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
@@ -30,12 +30,16 @@ import org.apache.streampipes.vocabulary.StreamPipes;
import javax.persistence.*;
import java.util.List;
+import java.util.UUID;
@RdfsClass(StreamPipes.NODE_INFO_DESCRIPTION)
@Entity
@TsModel
public class NodeInfoDescription extends UnnamedStreamPipesEntity {
+ private static final long serialVersionUID = 4294360613297596807L;
+ protected static final String prefix = "urn:streampipes.org:nid:";
+
@JsonProperty("_id")
private @SerializedName("_id") String id;
@@ -78,7 +82,7 @@ public class NodeInfoDescription extends UnnamedStreamPipesEntity {
private List<String> supportedElements;
public NodeInfoDescription() {
- super();
+ super(prefix + UUID.randomUUID().toString());
}
public NodeInfoDescription(String elementId) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
index 6d2aeef..2ec7053 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
@@ -21,6 +21,7 @@ import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
import org.apache.streampipes.model.shared.annotation.TsModel;
+import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.StreamPipes;
import javax.persistence.Entity;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
similarity index 57%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
index 6d2aeef..1d88228 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
@@ -15,50 +15,43 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.model.node.meta;
+package org.apache.streampipes.model.resource;
+
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
-import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.vocabulary.StreamPipes;
import javax.persistence.Entity;
-
-@RdfsClass(StreamPipes.GEO_LOCATION)
+@RdfsClass(StreamPipes.HARDWARE_REQUIREMENT)
@Entity
@TsModel
-public class GeoLocation extends UnnamedStreamPipesEntity {
-
- @RdfProperty(StreamPipes.GEO_LOCATION_LATITUDE)
- private double latitude;
+public class Hardware extends NodeResourceRequirement {
- @RdfProperty(StreamPipes.GEO_LOCATION_LONGITUDE)
- private double longitude;
+ @RdfProperty(StreamPipes.HAS_GPU_REQUIREMENT)
+ private boolean gpu;
- public GeoLocation() {
+ public Hardware() {
super();
}
- public GeoLocation(double latitude, double longitude) {
- this.latitude = latitude;
- this.longitude = longitude;
- }
-
- public double getLatitude() {
- return latitude;
+ public Hardware(Hardware other) {
+ super(other);
+ this.gpu = other.isGpu();
}
- public void setLatitude(double latitude) {
- this.latitude = latitude;
+ public Hardware(Hardware other, boolean gpu) {
+ super(other);
+ this.gpu = gpu;
}
- public double getLongitude() {
- return longitude;
+ public boolean isGpu() {
+ return gpu;
}
- public void setLongitude(double longitude) {
- this.longitude = longitude;
+ public void setGpu(boolean gpu) {
+ this.gpu = gpu;
}
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
similarity index 56%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
index 6d2aeef..8847434 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
@@ -15,50 +15,35 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.model.node.meta;
+package org.apache.streampipes.model.resource;
-import io.fogsy.empire.annotations.RdfProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.vocabulary.StreamPipes;
-import javax.persistence.Entity;
+import javax.persistence.Entity;
+import javax.persistence.MappedSuperclass;
+import java.util.UUID;
-@RdfsClass(StreamPipes.GEO_LOCATION)
+@RdfsClass(StreamPipes.NODE_RESOURCE_REQUIREMENT)
@Entity
@TsModel
-public class GeoLocation extends UnnamedStreamPipesEntity {
-
- @RdfProperty(StreamPipes.GEO_LOCATION_LATITUDE)
- private double latitude;
-
- @RdfProperty(StreamPipes.GEO_LOCATION_LONGITUDE)
- private double longitude;
-
- public GeoLocation() {
- super();
- }
+@JsonSubTypes({
+ @JsonSubTypes.Type(Hardware.class)
+})
+public abstract class NodeResourceRequirement extends UnnamedStreamPipesEntity {
- public GeoLocation(double latitude, double longitude) {
- this.latitude = latitude;
- this.longitude = longitude;
- }
-
- public double getLatitude() {
- return latitude;
- }
-
- public void setLatitude(double latitude) {
- this.latitude = latitude;
- }
+ private static final long serialVersionUID = -8700750792058131323L;
+ protected static final String prefix = "urn:streampipes.org:nrr:";
- public double getLongitude() {
- return longitude;
+ public NodeResourceRequirement() {
+ super(prefix + UUID.randomUUID().toString());
}
- public void setLongitude(double longitude) {
- this.longitude = longitude;
+ public NodeResourceRequirement(NodeResourceRequirement other) {
+ super(other);
}
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index c02d0ff..26c6642 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -20,8 +20,9 @@ package org.apache.streampipes.model.util;
import org.apache.streampipes.model.SpDataStreamRelay;
import org.apache.streampipes.model.grounding.*;
-import org.apache.streampipes.model.node.NodeBrokerDescription;
import org.apache.streampipes.model.output.*;
+import org.apache.streampipes.model.resource.Hardware;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
import org.apache.streampipes.model.staticproperty.*;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.slf4j.Logger;
@@ -334,4 +335,19 @@ public class Cloner {
return ad;
}
}
+
+ public NodeResourceRequirement cloneResourceRequirements(NodeResourceRequirement o) {
+ if (o instanceof Hardware) {
+ return new Hardware((Hardware) o);
+ } else {
+ LOG.error("Could not clone node resource requirement of type: " + o.getClass().getCanonicalName());
+ return o;
+ }
+ }
+
+ public List<NodeResourceRequirement> resourceRequirements(List<NodeResourceRequirement> nrp) {
+ return nrp.stream()
+ .map(this::cloneResourceRequirements)
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index 48e2e88..ca50136 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.node.controller.container;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.util.ConsulUtil;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
import org.apache.streampipes.node.controller.container.api.NodeControllerResourceConfig;
@@ -50,26 +51,31 @@ public class NodeControllerInit {
app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
app.run();
- LOG.info("Load node info");
+ LOG.info("Load node info description");
NodeManager.getInstance().init();
- LOG.info("Start Node resource manager");
- ResourceManager.getInstance().run();
+ LOG.info("Register node controller at backend");
+ boolean success = NodeManager.getInstance().register();
- if (!"true".equals(System.getenv("SP_DEBUG"))) {
- LOG.info("Auto-deploy StreamPipes node container");
- DockerContainerManager.getInstance().init();
+ if (success) {
+ LOG.info("Start resource manager");
+ ResourceManager.getInstance().run();
- LOG.info("Start Janitor manager");
- JanitorManager.getInstance().run();
- }
+ if (!"true".equals(System.getenv("SP_DEBUG"))) {
+ LOG.info("Auto-deploy extensions container");
+ DockerContainerManager.getInstance().init();
- // registration with consul here
- ConsulUtil.registerNodeService(
- conf.getNodeServiceId(),
- conf.getNodeHostName(),
- conf.getNodeControllerPort()
- );
+ LOG.info("Start janitor manager");
+ JanitorManager.getInstance().run();
+ }
+
+ // registration with consul here
+ ConsulUtil.registerNodeService(
+ conf.getNodeServiceId(),
+ conf.getNodeHostName(),
+ conf.getNodeControllerPort()
+ );
+ } else throw new SpRuntimeException("Could not register node controller at backend");
}
@PreDestroy
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
index 6e5ebb0..bb186e8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
@@ -21,10 +21,10 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import javax.ws.rs.Path;
-@Path("/api/v2/node/element/sepa")
-public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
-
- public DataProcessorPipelineElementResource() {
- super(DataProcessorInvocation.class);
- }
-}
+//@Path("/api/v2/node/element/sepa")
+//public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
+//
+// public DataProcessorPipelineElementResource() {
+// super(DataProcessorInvocation.class);
+// }
+//}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
index af99b25..c404896 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
@@ -21,10 +21,10 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
import javax.ws.rs.Path;
-@Path("/api/v2/node/element/sec")
-public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-
- public DataSinkPipelineElementResource() {
- super(DataSinkInvocation.class);
- }
-}
+//@Path("/api/v2/node/element/sec")
+//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
+//
+// public DataSinkPipelineElementResource() {
+// super(DataSinkInvocation.class);
+// }
+//}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
index b3a01f2..7e7b36a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
@@ -30,6 +30,9 @@ import javax.ws.rs.core.Response;
@Path("/api/v2/node/info")
public class InfoStatusResource extends AbstractResource {
+ private static final String ACTIVATE = "activate";
+ private static final String DEACTIVATE = "deactivate";
+
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getNodeInfo() {
@@ -43,6 +46,19 @@ public class InfoStatusResource extends AbstractResource {
return ok(NodeManager.getInstance().updateNodeInfoDescription(desc));
}
+ @POST
+ @Path("{action}")
+ @JacksonSerialized
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response activateNode(@PathParam("action") String action) {
+ if (action.equals(ACTIVATE)) {
+ return ok(NodeManager.getInstance().activate());
+ } else if (action.equals(DEACTIVATE)) {
+ return ok(NodeManager.getInstance().deactivate());
+ } else return fail();
+ }
+
@GET
@Path("/resources")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 0c71b57..2b779c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -23,10 +23,10 @@ import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,31 +34,30 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
-public abstract class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractResource {
+@Path("/api/v2/node/element")
+public class InvocableEntityResource extends AbstractResource {
private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
-
private static final String SLASH = "/";
-
- protected Class<I> clazz;
-
- public InvocableEntityResource(Class<I> clazz) {
- this.clazz = clazz;
- }
+ private static final String DATA_PROCESSOR_PREFIX = "sepa";
+ private static final String DATA_SINK_PREFIX = "sec";
@POST
@Path("/register")
+ @JacksonSerialized
+ @Consumes(MediaType.APPLICATION_JSON)
public void register(InvocableRegistration registration) {
InvocableElementManager.getInstance().register(registration);
}
@POST
- @Path("{elementId}")
+ @Path("{identifier}/{elementId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response invoke(@PathParam("elementId") String elementId, I graph) {
+ public javax.ws.rs.core.Response invoke(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId, InvocableStreamPipesEntity graph) {
String endpoint;
- if (graph instanceof DataProcessorInvocation) {
+ if (identifier.equals(DATA_PROCESSOR_PREFIX)) {
endpoint = graph.getBelongsTo();
DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
@@ -69,7 +68,7 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
}
// Currently no data sinks are registered at node controller. If we, at some point, want to also run data
// sinks on edge nodes we need to register there Declarer at the node controller one startup.
- else if (graph instanceof DataSinkInvocation) {
+ else if (identifier.equals(DATA_SINK_PREFIX)) {
endpoint = graph.getBelongsTo();
Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
if (resp.isSuccess()) {
@@ -82,12 +81,11 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
}
@DELETE
- @Path("{elementId}/{runningInstanceId}")
+ @Path("{identifier}/{elementId}/{runningInstanceId}")
@Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response detach(@PathParam("elementId") String elementId,
- @PathParam("runningInstanceId") String runningInstanceId) {
- LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
-
+ public javax.ws.rs.core.Response detach(@PathParam("identifier") String identifier,
+ @PathParam("elementId") String elementId,
+ @PathParam("runningInstanceId") String runningInstanceId) {
String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -96,7 +94,7 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
return ok(resp);
}
- private String toJson(I graph) {
+ private String toJson(InvocableStreamPipesEntity graph) {
try {
return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
} catch (JsonProcessingException e) {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 8db6aba..5fcd3dc 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -26,8 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
public NodeControllerResourceConfig() {
register(HealthCheckResource.class);
register(InfoStatusResource.class);
- register(DataProcessorPipelineElementResource.class);
- register(DataSinkPipelineElementResource.class);
+ register(InvocableEntityResource.class);
register(AdapterDataStreamRelayResource.class);
register(ConnectResource.class);
register(ContainerResource.class);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index c468440..a597a03 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -18,7 +18,12 @@
package org.apache.streampipes.node.controller.container.management.node;
import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.node.*;
import org.apache.streampipes.model.node.meta.GeoLocation;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
@@ -31,6 +36,14 @@ import java.io.IOException;
public class NodeManager {
private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
+ private static final String PROTOCOL = "http://";
+ private static final String SLASH = "/";
+ private static final String COLON = ":";
+ private static final String BACKEND_BASE_ROUTE = "/streampipes-backend";
+ private static final String NODE_REGISTRATION_ROUTE = "/api/v2/users/admin@streampipes.org/nodes";
+ private static final long RETRY_INTERVAL_MS = 5000;
+ private static final int CONNECT_TIMEOUT_MS = 10000;
+
private NodeInfoDescription nodeInfo = new NodeInfoDescription();
private static NodeManager instance = null;
@@ -78,38 +91,76 @@ public class NodeManager {
.build())
.build();
- NodeManager.getInstance().add(nodeInfoDescription);
-
- register(nodeInfoDescription);
+ add(nodeInfoDescription);
}
- private void register(NodeInfoDescription desc) {
-
- String url =
- "http://"
- + NodeControllerConfig.INSTANCE.getBackendHost()
- + ":"
- + NodeControllerConfig.INSTANCE.getBackendPort()
- + "/"
- + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes";
-
+ public boolean register() {
+ boolean connected = false;
try {
- String nodeInfoDescription = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-
- Request.Post(url)
- .bodyString(nodeInfoDescription, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
-
+ String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
+ String endpoint = generateEndpoint();
+
+ LOG.info("Trying to register node at backend: " + endpoint);
+
+ while (!connected) {
+ connected = post(endpoint, body);
+ if (!connected) {
+ LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("Successfully registered node at backend");
} catch (IOException e) {
- LOG.info("Could not connect to " + url);
+ e.printStackTrace();
}
+ return connected;
}
- public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
+ public Message updateNodeInfoDescription(NodeInfoDescription desc) {
LOG.info("Update node description for node controller: {}", desc.getNodeControllerId());
this.nodeInfo = desc;
- return true;
+ return Notifications.success(NotificationType.OPERATION_SUCCESS);
+ }
+
+ public Message activate() {
+ LOG.info("Deactivate node controller");
+ this.nodeInfo.setActive(true);
+ return Notifications.success(NotificationType.OPERATION_SUCCESS);
+ }
+
+ public Message deactivate() {
+ LOG.info("Activate node controller");
+ this.nodeInfo.setActive(false);
+ return Notifications.success(NotificationType.OPERATION_SUCCESS);
+ }
+
+ private boolean post(String endpoint, String body) throws IOException {
+ Response response = Request.Post(endpoint)
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT_MS)
+ .execute();
+ return handleResponse(response);
+ }
+
+ private boolean handleResponse(Response response) throws IOException {
+ String resp = response.returnContent().asString();
+ SuccessMessage message = JacksonSerializer
+ .getObjectMapper()
+ .readValue(resp, SuccessMessage.class);
+ return message.isSuccess();
+ }
+
+ private String generateEndpoint() {
+ return PROTOCOL
+ + NodeControllerConfig.INSTANCE.getBackendHost()
+ + COLON
+ + NodeControllerConfig.INSTANCE.getBackendPort()
+ + SLASH
+ + BACKEND_BASE_ROUTE
+ + NODE_REGISTRATION_ROUTE;
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 692f3b4..4db36b3 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
-public class InvocableElementManager implements InvocableLifeCycle {
+public class InvocableElementManager implements PipelineElementLifeCycle {
private static final Logger LOG =
LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 168d998..0ad32ee 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.node.controller.container.management.pe;
import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.model.Response;
-public interface InvocableLifeCycle {
+public interface PipelineElementLifeCycle {
void register(InvocableRegistration registration);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
new file mode 100644
index 0000000..b5803d5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.node;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class AbstractClusterManager {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
+
+ private static final String PROTOCOL = "http://";
+ private static final String SLASH = "/";
+ private static final String COLON = ":";
+ private static final long RETRY_INTERVAL_MS = 5000;
+ private static final Object BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
+ private static final int CONNECT_TIMEOUT = 1000;
+
+ protected static boolean syncStateUpdateWithRemoteNodeController(NodeInfoDescription desc, boolean activate) {
+ boolean synced = false;
+ String url;
+ if (activate) {
+ url = generateEndpoint(desc, "/activate");
+ } else {
+ url = generateEndpoint(desc, "/deactivate");
+ }
+ LOG.info("Trying to sync state update with node controller=" + url);
+
+ boolean connected = false;
+ while (!connected) {
+ connected = post(url);
+ if (!connected) {
+ LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ synced = true;
+ return synced;
+ }
+
+ protected static boolean syncWithRemoteNodeController(NodeInfoDescription desc) {
+ boolean synced = false;
+ try {
+ String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+ String url = generateEndpoint(desc);
+ LOG.info("Trying to sync description updates with node controller=" + url);
+
+ boolean connected = false;
+ while (!connected) {
+ connected = put(url, body);
+ if (!connected) {
+ LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ synced = true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return synced;
+ }
+
+ protected static String generateEndpoint(NodeInfoDescription desc) {
+ return generateEndpoint(desc, "");
+ }
+
+ protected static String generateEndpoint(NodeInfoDescription desc, String subroute) {
+ return PROTOCOL + desc.getHostname() + COLON + desc.getPort() + BASE_NODE_CONTROLLER_INFO_ROUTE + subroute;
+ }
+
+ protected static boolean put(String url, String body) {
+ try {
+ Request.Put(url)
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ protected static boolean post(String url) {
+ try {
+ Request.Post(url)
+ .bodyString("{}", ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index d37e23a..3f58f2f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -17,68 +17,81 @@
*/
package org.apache.streampipes.manager.node;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
-public enum NodeClusterManager {
- INSTANCE;
+public class NodeClusterManager extends AbstractClusterManager {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
- private static final Logger LOG =
- LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
-
-
- public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
- boolean successfullyUpdated = false;
- try {
- String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
- String url = makeNodeControllerEndpoint(desc);
+ public static List<NodeInfoDescription> getAvailableNodes() {
+ //return new AvailableNodesFetcher().fetchNodes();
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
+ }
- LOG.info("Trying to update description for node controller: " + url);
+ public static List<NodeInfoDescription> getAllNodes() {
+ //return new AvailableNodesFetcher().fetchNodes();
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+ }
- boolean connected = false;
- while (!connected) {
- connected = put(url, body);
+ public static Message updateNode(NodeInfoDescription desc) {
+ boolean successfullyUpdated = syncWithRemoteNodeController(desc);
+ if (successfullyUpdated) {
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
+ return Notifications.success("Node updated");
+ }
+ return Notifications.error("Could not update node");
+ }
- if (!connected) {
- LOG.info("Retrying in 5 seconds");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- successfullyUpdated = true;
- } catch (IOException e) {
- e.printStackTrace();
+ public static boolean deactivateNode(String nodeControllerId) {
+ Optional<NodeInfoDescription> storedNode =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+ boolean status = false;
+ if (storedNode.isPresent()) {
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deactivateNode(nodeControllerId);
+ status = syncStateUpdateWithRemoteNodeController(storedNode.get(), false);
}
- return successfullyUpdated;
+ return status;
}
- private String makeNodeControllerEndpoint(NodeInfoDescription desc) {
- return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/info";
+ public static boolean activateNode(String nodeControllerId) {
+ Optional<NodeInfoDescription> storedNode =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+ boolean status = false;
+ if (storedNode.isPresent()) {
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().activateNode(nodeControllerId);
+ status = syncStateUpdateWithRemoteNodeController(storedNode.get(), true);
+ }
+ return status;
}
- private boolean put(String url, String body) {
- try {
- Request.Put(url)
- .bodyString(body, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute();
- return true;
- } catch (IOException e) {
- e.printStackTrace();
+ public static void addNode(NodeInfoDescription desc) {
+ List<NodeInfoDescription> allNodes =
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+
+ boolean alreadyRegistered = false;
+ if (allNodes.size() > 0) {
+ alreadyRegistered = allNodes.stream()
+ .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
+ }
+
+ if (!alreadyRegistered) {
+ LOG.info("New cluster node join registration request on from http://{}:{}", desc.getHostname(), desc.getPort());
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
+ LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+ } else {
+ LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
}
- return false;
+ }
+
+ public static void deleteNode(String nodeControllerId) {
+ StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index eb71a42..a3696a3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -26,8 +26,6 @@ import org.apache.streampipes.manager.execution.http.PipelineExecutor;
import org.apache.streampipes.manager.execution.http.PipelineStorageService;
import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
-import org.apache.streampipes.manager.node.AvailableNodesFetcher;
-import org.apache.streampipes.manager.node.NodeClusterManager;
import org.apache.streampipes.manager.recommender.ElementRecommender;
import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
import org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
@@ -40,12 +38,9 @@ import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
import org.apache.streampipes.model.message.DataSetModificationMessage;
import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -188,41 +183,4 @@ public class Operations {
public static String getRuntimeInfo(SpDataStream spDataStream) throws SpRuntimeException {
return PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
}
-
- public static List<NodeInfoDescription> getAvailableNodes() {
- //return new AvailableNodesFetcher().fetchNodes();
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
- }
-
- public static List<NodeInfoDescription> getAllNodes() {
- //return new AvailableNodesFetcher().fetchNodes();
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
- }
-
-
- public static Message updateNode(NodeInfoDescription desc) {
- boolean successfullyUpdated = NodeClusterManager.INSTANCE.updateNodeInfoDescription(desc);
- if (successfullyUpdated) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
- return Notifications.success("Node modified");
- }
- return Notifications.error("Could not modify node");
- }
-
- public static void addNode(NodeInfoDescription desc) {
-
- List<NodeInfoDescription> allNodes =
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
-
- boolean alreadyRegistered = allNodes.stream()
- .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
-
- if (!alreadyRegistered) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
- }
- }
-
- public static void deleteNode(String nodeControllerId) {
- StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
- }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index d522345..d4f1b1f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -32,7 +32,7 @@ import java.util.*;
public class CouchDbInstallationStep implements InstallationStep {
- private static List<String> initRdfEndpointPorts =
+ private static final List<String> initRdfEndpointPorts =
Collections.singletonList("8099/api/v1/admin@streampipes.org/master/sources/");
private static final String initRdfEndpointHost = "http://localhost:";
@@ -79,6 +79,7 @@ public class CouchDbInstallationStep implements InstallationStep {
Utils.getCouchDbDashboardWidgetClient();
Utils.getCouchDbLabelClient();
Utils.getCouchDbCategoryClient();
+ Utils.getCouchDbNodeClient();
return Collections.singletonList(Notifications.success(getTitle()));
} catch (Exception e) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
index 7ec4697..b5ba229 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
@@ -29,23 +29,28 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.LoggerFactory;
import java.util.logging.Logger;
public class TypeExtractor {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TypeExtractor.class.getCanonicalName());
- private static final Logger logger = Logger.getAnonymousLogger();
+ private static final String DATA_PROCESSOR_PREFIX = "sepa";
+ private static final String DATA_SINK_PREFIX = "sec";
+ private static final String DATA_SOURCE_PREFIX = "sep";
- private String pipelineElementDescription;
+ private final String pipelineElementDescription;
public TypeExtractor(String pipelineElementDescription) {
this.pipelineElementDescription = pipelineElementDescription;
-
}
public ElementVerifier<?> getTypeVerifier() throws SepaParseException {
try {
- ObjectNode jsonNode = JacksonSerializer.getObjectMapper().readValue(this.pipelineElementDescription, ObjectNode.class);
+ ObjectNode jsonNode = JacksonSerializer
+ .getObjectMapper()
+ .readValue(this.pipelineElementDescription, ObjectNode.class);
String jsonClassName = jsonNode.get("@class").asText();
return getTypeDef(jsonClassName);
} catch (JsonProcessingException e) {
@@ -57,24 +62,30 @@ public class TypeExtractor {
if (jsonClassName == null) {
throw new SepaParseException();
} else {
- if (jsonClassName.equals(ep())) { logger.info("Detected type sep"); return new SepVerifier(pipelineElementDescription); }
- else if (jsonClassName.equals(epa())) { logger.info("Detected type sepa"); return new SepaVerifier(pipelineElementDescription); }
- else if (jsonClassName.equals(ec())) { logger.info("Detected type sec"); return new SecVerifier(pipelineElementDescription); }
- else throw new SepaParseException();
+ if (jsonClassName.equals(ep())) {
+ LOG.debug("Detected active pipeline element type {}", DATA_SOURCE_PREFIX);
+ return new SepVerifier(pipelineElementDescription);
+ } else if (jsonClassName.equals(epa())) {
+ LOG.info("Detected active pipeline element type {}", DATA_PROCESSOR_PREFIX);
+ return new SepaVerifier(pipelineElementDescription);
+ } else if (jsonClassName.equals(ec())) {
+ LOG.info("Detected active pipeline element type {}", DATA_SINK_PREFIX);
+ return new SecVerifier(pipelineElementDescription);
+ } else throw new SepaParseException();
}
}
- private static final String ep()
+ private static String ep()
{
return DataSourceDescription.class.getCanonicalName();
}
- private static final String epa()
+ private static String epa()
{
return DataProcessorDescription.class.getCanonicalName();
}
- private static final String ec()
+ private static String ec()
{
return DataSinkDescription.class.getCanonicalName();
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
index e5ae656..484d084 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
@@ -29,6 +29,8 @@ public interface INode {
Response deleteNode(String username, String nodeControllerId);
+ Response changeNodeState(String action, String username, String nodeControllerId);
+
Response getAvailableNodes();
Response getNodes();
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index b7f2dee..df43c35 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -17,14 +17,12 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.node.NodeClusterManager;
+import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.rest.api.INode;
-import org.apache.streampipes.rest.shared.annotation.GsonClientModel;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@@ -32,18 +30,20 @@ import javax.ws.rs.core.Response;
@Path("/v2/users/{username}/nodes")
public class Node extends AbstractRestInterface implements INode {
- private static final Logger LOG = LoggerFactory.getLogger(Node.class.getCanonicalName());
+
+ private static final String ACTIVATE = "activate";
+ private static final String DEACTIVATE = "deactivate";
@POST
@JacksonSerialized
@Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
@Override
public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
- Operations.addNode(desc);
- return statusMessage(Notifications.success("Node added"));
+ NodeClusterManager.addNode(desc);
+ return statusMessage(Notifications.success(NotificationType.STORAGE_SUCCESS));
}
-
@PUT
@JacksonSerialized
@Path("/{nodeControllerId}")
@@ -52,7 +52,29 @@ public class Node extends AbstractRestInterface implements INode {
@Override
public Response updateNode(@PathParam("username") String username,
@PathParam("nodeControllerId") String nodeControllerId, NodeInfoDescription desc) {
- return statusMessage(Operations.updateNode(desc));
+ return statusMessage(NodeClusterManager.updateNode(desc));
+ }
+
+ @POST
+ @JacksonSerialized
+ @Path("/{action}/{nodeControllerId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Override
+ public Response changeNodeState(@PathParam("action") String action,
+ @PathParam("username") String username,
+ @PathParam("nodeControllerId") String nodeControllerId) {
+ boolean success = false;
+ if (action.equals(ACTIVATE)) {
+ success = NodeClusterManager.activateNode(nodeControllerId);
+ } else if (action.equals(DEACTIVATE)) {
+ success = NodeClusterManager.deactivateNode(nodeControllerId);
+ }
+ if (success) {
+ return statusMessage(Notifications.success(NotificationType.OPERATION_SUCCESS));
+ } else {
+ return statusMessage(Notifications.error(NotificationType.NODE_STATE_UPDATE_ERROR));
+ }
}
@DELETE
@@ -60,8 +82,8 @@ public class Node extends AbstractRestInterface implements INode {
@Override
public Response deleteNode(@PathParam("username") String username,
@PathParam("nodeControllerId") String nodeControllerId) {
- Operations.deleteNode(nodeControllerId);
- return statusMessage(Notifications.success("Node deleted"));
+ NodeClusterManager.deleteNode(nodeControllerId);
+ return statusMessage(Notifications.success(NotificationType.REMOVED_NODE));
}
@GET
@@ -70,8 +92,7 @@ public class Node extends AbstractRestInterface implements INode {
@Produces(MediaType.APPLICATION_JSON)
@Override
public Response getAvailableNodes() {
- // TODO: get from couchdb not from consul
- return ok(Operations.getAvailableNodes());
+ return ok(NodeClusterManager.getAvailableNodes());
}
@GET
@@ -79,6 +100,6 @@ public class Node extends AbstractRestInterface implements INode {
@Produces(MediaType.APPLICATION_JSON)
@Override
public Response getNodes() {
- return ok(Operations.getAllNodes());
+ return ok(NodeClusterManager.getAllNodes());
}
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
index 54b23f8..cb9a033 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
@@ -42,7 +42,7 @@ public class PipelineElementImportNoUser extends AbstractRestInterface {
@Context
UriInfo uri;
- @Path("/")
+// @Path("/")
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response addElement(@PathParam("username") String username, @FormParam("uri") String uri, @FormParam("publicElement") boolean publicElement) {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index 637b2c4..e035491 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -24,12 +24,14 @@ import org.apache.streampipes.model.constants.PropertySelectorConstants;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.staticproperty.MappingProperty;
import org.apache.streampipes.model.staticproperty.MappingPropertyNary;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.sdk.helpers.CollectedResourceRequirements;
import org.apache.streampipes.sdk.helpers.CollectedStreamRequirements;
import org.apache.streampipes.sdk.helpers.Label;
@@ -43,6 +45,8 @@ public abstract class AbstractProcessingElementBuilder<BU extends
protected List<SpDataStream> streamRequirements;
+ protected List<NodeResourceRequirement> resourceRequirements;
+
protected List<EventProperty> stream1Properties;
protected List<EventProperty> stream2Properties;
@@ -57,6 +61,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
this.streamRequirements = new ArrayList<>();
this.stream1Properties = new ArrayList<>();
this.stream2Properties = new ArrayList<>();
+ this.resourceRequirements = new ArrayList<>();
this.supportedGrounding = new EventGrounding();
}
@@ -65,6 +70,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
this.streamRequirements = new ArrayList<>();
this.stream1Properties = new ArrayList<>();
this.stream2Properties = new ArrayList<>();
+ this.resourceRequirements = new ArrayList<>();
this.supportedGrounding = new EventGrounding();
}
@@ -93,7 +99,15 @@ public abstract class AbstractProcessingElementBuilder<BU extends
return me();
}
- public BU requiredResource() {
+ /**
+ * Set a new resource requirement by adding restrictions on node. Use
+ * {@link ResourceRequirementsBuilder} to create requirements.
+ *
+ * @param resourceRequirements: A bundle of collected {@link CollectedResourceRequirements}
+ * @return this
+ */
+ public BU requiredResource(CollectedResourceRequirements resourceRequirements) {
+ this.resourceRequirements.addAll(resourceRequirements.getResourceRequirements());
return me();
}
@@ -279,6 +293,9 @@ public abstract class AbstractProcessingElementBuilder<BU extends
this.elementDescription.setSpDataStreams(streamRequirements);
+ if (this.resourceRequirements.size() > 0) {
+ this.elementDescription.setResourceRequirements(resourceRequirements);
+ }
}
private SpDataStream buildStream(List<EventProperty> streamProperties) {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
index 9a1dbbf..1f99575 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
@@ -102,6 +102,7 @@ public class ProcessingElementBuilder extends AbstractProcessingElementBuilder<P
public void prepareBuild() {
super.prepareBuild();
this.elementDescription.setOutputStrategies(outputStrategies);
+ this.elementDescription.setResourceRequirements(resourceRequirements);
}
@Override
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
similarity index 50%
copy from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
copy to streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
index cc1cb92..94b3590 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
@@ -15,25 +15,33 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.sdk.builder;
-package org.apache.streampipes.storage.api;
-import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
+import org.apache.streampipes.sdk.helpers.CollectedResourceRequirements;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
-public interface INodeInfoStorage {
+public class ResourceRequirementsBuilder {
- List<NodeInfoDescription> getAllNodes();
+ private final List<NodeResourceRequirement> resourceRequirements;
- List<NodeInfoDescription> getAllActiveNodes();
+ public static ResourceRequirementsBuilder create() {
+ return new ResourceRequirementsBuilder();
+ }
- void storeNode(NodeInfoDescription desc);
+ public ResourceRequirementsBuilder() {
+ this.resourceRequirements = new ArrayList<>();
+ }
- void updateNode(NodeInfoDescription desc);
+ public ResourceRequirementsBuilder requiredProperty(NodeResourceRequirement nrp) {
+ this.resourceRequirements.add(nrp);
+ return this;
+ }
- Optional<NodeInfoDescription> getNode(String nodeControllerId);
-
- void deleteNode(String nodeControllerId);
+ public CollectedResourceRequirements build() {
+ return new CollectedResourceRequirements(this.resourceRequirements);
+ }
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
index 20a8786..77b54db 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
@@ -17,5 +17,28 @@
*/
package org.apache.streampipes.sdk.helpers;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
+
+import java.util.ArrayList;
+import java.util.List;
+
public class CollectedResourceRequirements {
+
+ private List<NodeResourceRequirement> resourceRequirements;
+
+ public CollectedResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+ this.resourceRequirements = resourceRequirements;
+ }
+
+ public CollectedResourceRequirements() {
+ this.resourceRequirements = new ArrayList<>();
+ }
+
+ public List<NodeResourceRequirement> getResourceRequirements() {
+ return resourceRequirements;
+ }
+
+ public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+ this.resourceRequirements = resourceRequirements;
+ }
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
similarity index 79%
copy from streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
copy to streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
index 20a8786..457fba5 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
@@ -17,5 +17,13 @@
*/
package org.apache.streampipes.sdk.helpers;
-public class CollectedResourceRequirements {
+import org.apache.streampipes.model.resource.Hardware;
+
+public class NodeResourceRequirements {
+
+ public static Hardware gpu() {
+ Hardware rp = new Hardware();
+ rp.setGpu(true);
+ return rp;
+ }
}
diff --git a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
index 357a5e4..cf056c2 100644
--- a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
+++ b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
@@ -49,6 +49,7 @@ import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.quality.EventPropertyQualityDefinition;
import org.apache.streampipes.model.quality.EventStreamQualityDefinition;
import org.apache.streampipes.model.quality.Frequency;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.ValueSpecification;
import org.apache.streampipes.model.staticproperty.MappingProperty;
@@ -118,6 +119,7 @@ public class GsonSerializer {
builder.registerTypeAdapter(ContainerRuntime.class, new JsonLdSerializer<ContainerRuntime>());
builder.registerTypeAdapter(DeploymentContainer.class, new JsonLdSerializer<DeploymentContainer>());
+ builder.registerTypeAdapter(NodeResourceRequirement.class, new JsonLdSerializer<NodeResourceRequirement>());
builder.setPrettyPrinting();
return builder;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 4a3cf13..b9ee0fa 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -57,6 +57,7 @@ import org.apache.streampipes.model.node.resources.software.NvidiaContainerRunti
import org.apache.streampipes.model.node.resources.software.SoftwareResource;
import org.apache.streampipes.model.output.*;
import org.apache.streampipes.model.quality.*;
+import org.apache.streampipes.model.resource.Hardware;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.model.schema.*;
@@ -211,7 +212,8 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
DISK.class,
GPU.class,
SoftwareResource.class,
- FieldDeviceAccessResource.class
+ FieldDeviceAccessResource.class,
+ Hardware.class
);
}
}
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
index cc1cb92..b1dfa29 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
@@ -31,6 +31,10 @@ public interface INodeInfoStorage {
void storeNode(NodeInfoDescription desc);
+ void deactivateNode(String nodeControllerId);
+
+ void activateNode(String nodeControllerId);
+
void updateNode(NodeInfoDescription desc);
Optional<NodeInfoDescription> getNode(String nodeControllerId);
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
index 6a8a7e4..5107bfa 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
@@ -50,12 +50,36 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
@Override
public void storeNode(NodeInfoDescription desc) {
- LOG.info("Store new node description with node id={}, url={}", desc.getNodeControllerId(),
- desc.getHostname() + ":" + desc.getPort());
persist(desc);
}
@Override
+ public void deactivateNode(String nodeControllerId) {
+ Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
+ if (storedNode.isPresent()) {
+ LOG.info("Deactivate node controller={} at url=http://{}:{}",
+ storedNode.get().getNodeControllerId(),
+ storedNode.get().getHostname(),
+ storedNode.get().getPort());
+ storedNode.get().setActive(false);
+ update(storedNode.get());
+ }
+ }
+
+ @Override
+ public void activateNode(String nodeControllerId) {
+ Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
+ if (storedNode.isPresent()) {
+ LOG.info("Activate node controller={} at url=http://{}:{}",
+ storedNode.get().getNodeControllerId(),
+ storedNode.get().getHostname(),
+ storedNode.get().getPort());
+ storedNode.get().setActive(true);
+ update(storedNode.get());
+ }
+ }
+
+ @Override
public void updateNode(NodeInfoDescription desc) {
LOG.info("Update node description for node id={}, url={}", desc.getNodeControllerId(),
desc.getHostname() + ":" + desc.getPort());
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index b706a6a..e2a2150 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -484,4 +484,9 @@ public class StreamPipes {
public static final String HAS_HARDWARE_RESOURCES = NS + "hasHardwareResources";
public static final String HAS_CONTAINER_RUNTIME_SERVER_VERSION = NS + "hasContainerRuntimeServerVersion";
public static final String HAS_CONTAINER_RUNTIME_API_VERSION = NS + "hasContainerRuntimeApiVersion";
+ public static final String HARDWARE_REQUIREMENT = NS + "HardwareRequirement";
+ public static final String HAS_GPU_REQUIREMENT = NS + "hasGpuRequirement";
+ public static final String HAS_NODE_RESOURCE_PROPERTY = NS + "hasNodeResourceProperty";
+ public static final String NODE_RESOURCE_REQUIREMENT = NS + "NodeResourceRequirement";
+ public static final String REQUIRES_RESOURCES = NS + "requiresResources";
}
diff --git a/ui/src/app/configuration/node-configuration/node-configuration.component.ts b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
index 7f295c6..46cd508 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
@@ -110,7 +110,13 @@ export class NodeConfigurationComponent implements OnInit{
// No adapters detected on this node. This means that no adapter was created on this host. Thus
// we can safely proceed setting a new state, i.e. active = (true || false) this node
node.active = desiredState;
- this.nodeService.updateNodeState(node).subscribe(statusMessage => {
+ let stateService;
+ if (desiredState) {
+ stateService = this.nodeService.activateNode(node.nodeControllerId);
+ } else {
+ stateService = this.nodeService.deactivateNode(node.nodeControllerId);
+ }
+ stateService.subscribe(statusMessage => {
if(statusMessage.success) {
this.openSnackBar("Node successfully " + (desiredState ? "activated" : "deactivated"))
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 2650545..44fde71 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,11 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-12-30 22:52:01.
+// Generated using typescript-generator version 2.24.612 on 2021-01-09 20:47:18.
export class AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ elementId: string;
static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
if (!data) {
@@ -30,13 +31,13 @@ export class AbstractStreamPipesEntity {
}
const instance = target || new AbstractStreamPipesEntity();
instance["@class"] = data["@class"];
+ instance.elementId = data.elementId;
return instance;
}
}
export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
- elementId: string;
+ "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
if (!data) {
@@ -44,7 +45,6 @@ export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
}
const instance = target || new UnnamedStreamPipesEntity();
super.fromData(data, instance);
- instance.elementId = data.elementId;
return instance;
}
}
@@ -125,7 +125,6 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
connectedTo: string[];
description: string;
dom: string;
- elementId: string;
iconUrl: string;
includedAssets: string[];
includedLocales: string[];
@@ -143,7 +142,6 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.name = data.name;
instance.description = data.description;
instance.iconUrl = data.iconUrl;
- instance.elementId = data.elementId;
instance.appId = data.appId;
instance.includesAssets = data.includesAssets;
instance.includesLocales = data.includesLocales;
@@ -1009,6 +1007,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
elementEndpointPort: number;
elementEndpointServiceName: string;
inputStreams: SpDataStreamUnion[];
+ resourceRequirements: NodeResourceRequirementUnion[];
staticProperties: StaticPropertyUnion[];
statusInfoSettings: ElementStatusInfoSettings;
streamRequirements: SpDataStreamUnion[];
@@ -1029,6 +1028,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
instance.correspondingPipeline = data.correspondingPipeline;
instance.correspondingUser = data.correspondingUser;
instance.streamRequirements = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.streamRequirements);
+ instance.resourceRequirements = __getCopyArrayFn(NodeResourceRequirement.fromDataUnion)(data.resourceRequirements);
instance.elementEndpointHostname = data.elementEndpointHostname;
instance.elementEndpointPort = data.elementEndpointPort;
instance.deploymentTargetNodeId = data.deploymentTargetNodeId;
@@ -1847,6 +1847,44 @@ export class GuessSchema extends UnnamedStreamPipesEntity {
}
}
+export class NodeResourceRequirement extends UnnamedStreamPipesEntity {
+ "@class": "org.apache.streampipes.model.resource.NodeResourceRequirement" | "org.apache.streampipes.model.resource.Hardware";
+
+ static fromData(data: NodeResourceRequirement, target?: NodeResourceRequirement): NodeResourceRequirement {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new NodeResourceRequirement();
+ super.fromData(data, instance);
+ return instance;
+ }
+
+ static fromDataUnion(data: NodeResourceRequirementUnion): NodeResourceRequirementUnion {
+ if (!data) {
+ return data;
+ }
+ switch (data["@class"]) {
+ case "org.apache.streampipes.model.resource.Hardware":
+ return Hardware.fromData(data);
+ }
+ }
+}
+
+export class Hardware extends NodeResourceRequirement {
+ "@class": "org.apache.streampipes.model.resource.Hardware";
+ gpu: boolean;
+
+ static fromData(data: Hardware, target?: Hardware): Hardware {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new Hardware();
+ super.fromData(data, instance);
+ instance.gpu = data.gpu;
+ return instance;
+ }
+}
+
export class HardwareResource extends UnnamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.node.resources.hardware.HardwareResource";
cpu: CPU;
@@ -2343,6 +2381,7 @@ export class Pipeline extends ElementComposition {
eventRelayStrategy: string;
pipelineCategories: string[];
publicElement: boolean;
+ restartOnSystemReboot: boolean;
running: boolean;
startedAt: number;
@@ -2354,6 +2393,7 @@ export class Pipeline extends ElementComposition {
super.fromData(data, instance);
instance.actions = __getCopyArrayFn(DataSinkInvocation.fromData)(data.actions);
instance.running = data.running;
+ instance.restartOnSystemReboot = data.restartOnSystemReboot;
instance.startedAt = data.startedAt;
instance.createdAt = data.createdAt;
instance.publicElement = data.publicElement;
@@ -3225,6 +3265,8 @@ export type MappingPropertyUnion = MappingPropertyNary | MappingPropertyUnary;
export type MeasurementPropertyUnion = EventPropertyQualityDefinition | EventStreamQualityDefinition;
+export type NodeResourceRequirementUnion = Hardware;
+
export type OneOfStaticPropertyUnion = RuntimeResolvableOneOfStaticProperty;
export type OutputStrategyUnion = AppendOutputStrategy | CustomOutputStrategy | CustomTransformOutputStrategy | FixedOutputStrategy | KeepOutputStrategy | ListOutputStrategy | TransformOutputStrategy | UserDefinedOutputStrategy;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index fef866c..2571ead 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -23,7 +23,7 @@ import {
Message,
Pipeline,
NodeInfoDescription,
- StaticNodeMetadata
+ StaticNodeMetadata, NvidiaContainerRuntime, DockerContainerRuntime, ContainerRuntime, DataSinkInvocation
} from "../../../core-model/gen/streampipes-model";
import {ObjectProvider} from "../../services/object-provider.service";
import {EditorService} from "../../services/editor.service";
@@ -310,6 +310,4 @@ export class SavePipelineComponent implements OnInit {
this.applyDefaultPolicy(this.tmpPipeline.sepas);
}
}
-
-
}
\ No newline at end of file
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
index 1a99f9d..03a442b 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
@@ -78,7 +78,7 @@
matTooltipPosition="above"
[disabled]="!pipeline.running"
(click)="pipelineOperationsService.migratePipelineProcessors(pipeline)">
- <i class="material-icons">storage</i>
+ <i class="material-icons">swap_horizontal_circle_filled</i>
</button>
</span>
<span fxFlex fxFlexOrder="4" fxLayout="row" fxLayoutAlign="center center">
diff --git a/ui/src/app/platform-services/apis/node.service.ts b/ui/src/app/platform-services/apis/node.service.ts
index 290d36a..921edb5 100644
--- a/ui/src/app/platform-services/apis/node.service.ts
+++ b/ui/src/app/platform-services/apis/node.service.ts
@@ -52,4 +52,18 @@ export class NodeService {
return Message.fromData(response as Message);
}));
}
+
+ activateNode(nodeControllerId: string): Observable<Message> {
+ return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/activate/' + nodeControllerId, {})
+ .pipe(map(response => {
+ return Message.fromData(response as Message);
+ }));
+ }
+
+ deactivateNode(nodeControllerId: string): Observable<Message> {
+ return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/deactivate/' + nodeControllerId, {})
+ .pipe(map(response => {
+ return Message.fromData(response as Message);
+ }));
+ }
}
\ No newline at end of file