You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/01/02 18:29:33 UTC
[incubator-streampipes] 01/01: [STREAMPIPES-273] Remove DataSource
concept
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-272
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 631c078f4380ee6d927df9c2e7c813b575185e45
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jan 2 19:29:14 2021 +0100
[STREAMPIPES-273] Remove DataSource concept
---
.../backend/StreamPipesResourceConfig.java | 1 -
streampipes-connect-container-master/pom.xml | 7 +-
.../master/management/AdapterMasterManagement.java | 30 +--
.../master/management/SourcesManagement.java | 20 +-
.../container/master/rest/SourcesResource.java | 7 +-
.../extensions/ExtensionsResourceConfig.java | 2 +-
.../PipelineElementContainerResourceConfig.java | 2 +-
.../api/AbstractPipelineElementResource.java | 61 +----
.../api/DataSourcePipelineElementResource.java | 136 ----------
.../api/DataStreamPipelineElementResource.java | 103 +++++++
.../container/declarer/DataStreamDeclarer.java | 5 +-
.../declarer/SemanticEventProducerDeclarer.java | 1 +
.../streampipes/container/html/JSONGenerator.java | 1 +
.../container/html/model/Description.java | 9 +
.../html/page/EventConsumerWelcomePage.java | 98 +++----
.../html/page/EventProcessingAgentWelcomePage.java | 98 +++----
.../html/page/EventProducerWelcomePage.java | 126 ++++-----
.../html/page/WelcomePageGeneratorImpl.java | 129 +++++----
.../container/init/DeclarersSingleton.java | 40 +--
.../html/page/WelcomePageGeneratorImplTest.java | 133 ---------
.../model/client/endpoint/RdfEndpointItem.java | 12 +-
streampipes-model/pom.xml | 6 +-
.../org/apache/streampipes/model/SpDataStream.java | 12 +
.../model/base/AbstractStreamPipesEntity.java | 3 +
.../model/base/NamedStreamPipesEntity.java | 11 +
.../connect/adapter/AdapterStreamDescription.java | 4 +-
.../model/graph/DataSourceDescription.java | 1 +
.../org/apache/streampipes/model/util/Cloner.java | 6 +-
.../manager/execution/http/PipelineExecutor.java | 9 -
.../execution/status/SepMonitoringManager.java | 50 ----
.../manager/monitoring/job/JobManager.java | 64 -----
.../manager/monitoring/job/MonitoringJob.java | 81 ------
.../monitoring/job/MonitoringJobExecutor.java | 46 ----
.../manager/monitoring/job/MonitoringUtils.java | 39 ---
.../manager/monitoring/job/SecMonitoringJob.java | 36 ---
.../manager/monitoring/job/SepaMonitoringJob.java | 36 ---
.../pipeline/PipelineExecutionStatusCollector.java | 24 --
.../monitoring/runtime/EpRuntimeMonitoring.java | 27 --
.../manager/monitoring/runtime/EventGenerator.java | 72 -----
.../monitoring/runtime/FormatGenerator.java | 27 --
.../monitoring/runtime/JsonFormatGenerator.java | 23 --
.../monitoring/runtime/PipelineObserver.java | 92 -------
.../monitoring/runtime/PipelineStreamReplacer.java | 109 --------
.../monitoring/runtime/ProtocolHandler.java | 30 ---
.../monitoring/runtime/RandomDataGenerator.java | 74 -----
.../monitoring/runtime/RandomEventGenerator.java | 56 ----
.../monitoring/runtime/SchemaGenerator.java | 46 ----
.../monitoring/runtime/SepStoppedMonitoring.java | 178 ------------
.../SepStoppedMonitoringPipelineBuilder.java | 191 -------------
.../monitoring/runtime/SimilarStreamFinder.java | 112 --------
.../runtime/StaticPropertyGenerator.java | 24 --
.../monitoring/runtime/StreamGenerator.java | 23 --
.../monitoring/runtime/ThriftFormatGenerator.java | 23 --
.../monitoring/task/CompareDescriptionTask.java | 23 --
.../monitoring/task/GetDescriptionTask.java | 71 -----
.../manager/monitoring/task/InvokeRuntimeTask.java | 23 --
.../manager/monitoring/task/RemoveRuntimeTask.java | 23 --
.../manager/monitoring/task/TaskDefinition.java | 53 ----
.../manager/monitoring/task/VerifySchemaTask.java | 23 --
...epaVerifier.java => DataProcessorVerifier.java} | 4 +-
.../{SecVerifier.java => DataSinkVerifier.java} | 4 +-
.../{SepVerifier.java => DataStreamVerifier.java} | 27 +-
.../manager/verification/ElementVerifier.java | 24 +-
.../verification/extractor/TypeExtractor.java | 16 +-
.../streampipes/manager/matching/v2/TestUtils.java | 4 +-
.../manager/pipeline/TestSerializer.java | 48 ----
.../streampipes/rest/impl/ApplicationLink.java | 79 ------
.../rest/impl/InternalPipelineTemplates.java | 28 +-
.../rest/impl/OntologyPipelineElement.java | 12 +-
.../rest/impl/PipelineElementCategory.java | 8 +-
.../rest/impl/PipelineElementImport.java | 10 +-
.../streampipes/rest/impl/PipelineTemplate.java | 38 +--
.../apache/streampipes/rest/impl/RdfEndpoint.java | 20 +-
.../rest/impl/SemanticEventProducer.java | 31 +--
.../impl/nouser/PipelineElementImportNoUser.java | 4 +-
.../api/IPipelineElementDescriptionStorage.java | 31 ++-
.../rdf4j/impl/PipelineElementInMemoryStorage.java | 114 ++++----
.../rdf4j/impl/PipelineElementStorageRequests.java | 58 ++--
.../storage/rdf4j/sparql/QueryBuilder.java | 25 +-
.../apache/streampipes/vocabulary/StreamPipes.java | 4 +-
ui/src/app/add/add.component.html | 3 +-
ui/src/app/add/add.component.ts | 4 +-
.../endpoint-item/endpoint-item.component.html | 23 +-
.../endpoint-item/endpoint-item.component.ts | 14 +-
ui/src/app/connect/services/rest.service.ts | 2 +-
ui/src/app/core-model/gen/streampipes-model.ts | 297 ++-------------------
ui/src/app/editor/editor.component.ts | 8 +-
ui/src/app/home/components/status.component.ts | 15 +-
.../apis/pipeline-element.service.ts | 20 +-
ui/src/scss/sp/main.scss | 7 +-
90 files changed, 702 insertions(+), 3052 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 0a62a4d..75f3114 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -44,7 +44,6 @@ public class StreamPipesResourceConfig extends ResourceConfig {
public StreamPipesResourceConfig() {
register(Authentication.class);
- register(ApplicationLink.class);
register(AssetDashboard.class);
register(AutoComplete.class);
register(CategoryResource.class);
diff --git a/streampipes-connect-container-master/pom.xml b/streampipes-connect-container-master/pom.xml
index e44ed7e..91a8729 100644
--- a/streampipes-connect-container-master/pom.xml
+++ b/streampipes-connect-container-master/pom.xml
@@ -36,6 +36,11 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-management</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-storage-couchdb</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
@@ -77,4 +82,4 @@
<build>
<finalName>streampipes-connect-container-master</finalName>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 37e1686..c1f760b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -20,10 +20,13 @@ package org.apache.streampipes.connect.container.master.management;
import org.apache.http.client.fluent.Form;
import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.connect.adapter.GroundingService;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.config.ConnectContainerConfig;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.manager.verification.DataStreamVerifier;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
@@ -90,7 +93,6 @@ public class AdapterMasterManagement {
// TODO
WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
LOG.info("Start adapter");
-// SpConnect.startStreamAdapter((AdapterStreamDescription) ad, baseUrl);
}
// backend url is used to install data source in streampipes
@@ -98,31 +100,19 @@ public class AdapterMasterManagement {
String requestUrl = backendBaseUrl + "noauth/users/" + username + "/element";
LOG.info("Install source (source URL: " + newId + " in backend over URL: " + requestUrl);
+ SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(newId);
+ installDataSource(storedDescription, username);
- installDataSource(requestUrl, newId);
-
- return new SourcesManagement().getAdapterDataSource(newId).getElementId();
+ return storedDescription.getElementId();
}
- public boolean installDataSource(String requestUrl, String elementIdUrl) throws AdapterException {
-
+ public void installDataSource(SpDataStream stream, String username) throws AdapterException {
try {
- String responseString = Request.Post(requestUrl)
- .bodyForm(
- Form.form()
- .add("uri", elementIdUrl)
- .add("publicElement", "true").build())
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
-
- LOG.info(responseString);
- } catch (IOException e) {
- LOG.error("Error while installing data source: " + requestUrl, e);
+ new DataStreamVerifier(stream).verifyAndAdd(username, true, true);
+ } catch (SepaParseException e) {
+ LOG.error("Error while installing data source: " + stream.getElementId(), e);
throw new AdapterException();
}
-
- return true;
}
public AdapterDescription getAdapter(String id, AdapterStorageImpl adapterStorage) throws AdapterException {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 38f12ad..bbc2271 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -18,8 +18,6 @@
package org.apache.streampipes.connect.container.master.management;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
@@ -31,11 +29,12 @@ import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
@@ -126,6 +125,7 @@ public class SourcesManagement {
"This stream is generated by an StreamPipes Connect adapter. ID of adapter: " + ad.getId(), uri, streams);
dsd.setType("source");
dsd.setAppId(ad.getAppId());
+ dsd.setEditable(!(ad.isInternallyManaged()));
allAdapterDescriptions.add(dsd);
}
@@ -149,7 +149,7 @@ public class SourcesManagement {
return decryptedAdapterDescription;
}
- public DataSourceDescription getAdapterDataSource(String id) throws AdapterException {
+ public SpDataStream getAdapterDataStream(String id) throws AdapterException {
// AdapterDescription adapterDescription = new AdapterStorageImpl().getAdapter(id);
// get all Adapters and check id
@@ -179,17 +179,13 @@ public class SourcesManagement {
String url = adapterDescription.getUri();
ds.setName(adapterDescription.getName());
- ds.setDescription("Description");
+ ds.setDescription(adapterDescription.getDescription());
+ ds.setCorrespondingAdapterId(adapterDescription.getAdapterId());
+ ds.setInternallyManaged(true);
ds.setUri(url + "/streams");
- DataSourceDescription dataSourceDescription = new DataSourceDescription(
- url, "Adaper Data Source",
- "This data source contains one data stream from the adapters");
-
- dataSourceDescription.addEventStream(ds);
-
- return dataSourceDescription;
+ return ds;
}
public void setConnectHost(String connectHost) {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
index 7318bc2..672d7c2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.SourcesManagement;
import org.apache.streampipes.connect.rest.AbstractContainerResource;
import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -66,18 +65,14 @@ public class SourcesResource extends AbstractContainerResource {
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
public Response getAdapterDataSource(@PathParam("id") String id) {
-
try {
- DataSourceDescription result = this.sourcesManagement.getAdapterDataSource(id);
- return ok(result);
+ return ok(this.sourcesManagement.getAdapterDataStream(id));
} catch (AdapterException e) {
logger.error("Error while retrieving DataSourceDescription with id: " + id);
return fail();
}
}
-
-
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/{streamId}/streams")
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
index f595dd1..7a267a7 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -29,7 +29,7 @@ public class ExtensionsResourceConfig extends ResourceConfig {
public ExtensionsResourceConfig() {
register(DataSinkPipelineElementResource.class);
register(DataProcessorPipelineElementResource.class);
- register(DataSourcePipelineElementResource.class);
+ register(DataStreamPipelineElementResource.class);
register(WelcomePage.class);
register(PipelineElementTemplateResource.class);
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
index 69ef3a9..920c11d 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
@@ -28,7 +28,7 @@ public class PipelineElementContainerResourceConfig extends ResourceConfig {
public PipelineElementContainerResourceConfig() {
register(DataSinkPipelineElementResource.class);
register(DataProcessorPipelineElementResource.class);
- register(DataSourcePipelineElementResource.class);
+ register(DataStreamPipelineElementResource.class);
register(WelcomePage.class);
register(PipelineElementTemplateResource.class);
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
index 5b87557..2249acf 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
@@ -22,9 +22,7 @@ import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
import org.apache.streampipes.container.assets.AssetZipGenerator;
-import org.apache.streampipes.container.declarer.DataStreamDeclarer;
import org.apache.streampipes.container.declarer.Declarer;
-import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.locales.LabelGenerator;
import org.apache.streampipes.model.SpDataStream;
@@ -32,10 +30,10 @@ import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.rest.shared.util.SpMediaType;
import javax.ws.rs.GET;
@@ -55,12 +53,13 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
protected final String DATA_PROCESSOR_PREFIX = "sepa";
protected final String DATA_SINK_PREFIX = "sec";
- protected final String DATA_SOURCE_PREFIX = "sep";
+ protected final String DATA_STREAM_PREFIX = "stream";
private static final String SLASH = "/";
@GET
@Path("{id}")
@Produces(MediaType.APPLICATION_JSON)
+ @JacksonSerialized
public NamedStreamPipesEntity getDescription(@PathParam("id") String elementId) {
return prepareElement(elementId);
}
@@ -98,8 +97,8 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
return rewrite(getById(id));
}
- protected NamedStreamPipesEntity prepareElement(NamedStreamPipesEntity desc, String appendix) {
- return rewrite(desc, appendix);
+ protected NamedStreamPipesEntity prepareElement(NamedStreamPipesEntity desc) {
+ return rewrite(desc);
}
protected D getDeclarerById(String id) {
@@ -107,29 +106,11 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
}
protected NamedStreamPipesEntity getById(String id) {
- NamedStreamPipesEntity desc = null;
- Declarer declarer = getElementDeclarers().get(id);
- //TODO find a better solution to add the event streams to the SepDescription
- if (declarer instanceof SemanticEventProducerDeclarer) {
- DataSourceDescription secDesc = ((SemanticEventProducerDeclarer) declarer).declareModel();
- List<DataStreamDeclarer> eventStreamDeclarers = ((SemanticEventProducerDeclarer) declarer).getEventStreams();
- for (DataStreamDeclarer esd : eventStreamDeclarers) {
- secDesc.addEventStream(esd.declareModel(secDesc));
- }
-
- desc = secDesc;
- } else {
- desc = declarer.declareModel();
- }
-
- return desc;
+ Declarer<?> declarer = getElementDeclarers().get(id);
+ return declarer.declareModel();
}
protected NamedStreamPipesEntity rewrite(NamedStreamPipesEntity desc) {
- return rewrite(desc, "");
- }
-
- protected NamedStreamPipesEntity rewrite(NamedStreamPipesEntity desc, String appendix) {
//TODO remove this and find a better solution
if (desc != null) {
@@ -137,18 +118,14 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
if (desc instanceof DataProcessorDescription) {
type = DATA_PROCESSOR_PREFIX + SLASH;
- } else if (desc instanceof DataSourceDescription) {
- type = DATA_SOURCE_PREFIX + SLASH;
} else if (desc instanceof DataSinkDescription) {
type = DATA_SINK_PREFIX + SLASH;
} else if (desc instanceof SpDataStream) {
- type = DATA_SOURCE_PREFIX + SLASH + appendix + SLASH;
+ type = DATA_STREAM_PREFIX + SLASH;
}
- String originalId = desc.getElementId();
String uri = DeclarersSingleton.getInstance().getBaseUri() + type + desc.getElementId();
desc.setElementId(uri);
- desc.setElementId(uri);
// TODO remove after full internationalization support has been implemented
if (desc.isIncludesLocales()) {
@@ -159,27 +136,7 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
}
}
- if (desc instanceof DataSourceDescription) {
- for (SpDataStream stream : ((DataSourceDescription) desc).getSpDataStreams()) {
- String baseUri = DeclarersSingleton.getInstance().getBaseUri()
- + type
- + originalId
- + SLASH
- + stream.getElementId();
- stream.setElementId(baseUri);
- stream.setElementId(baseUri);
- // TODO remove after full internationalization support has been implemented
- if (stream.isIncludesLocales()) {
- try {
- LabelGenerator lg = new LabelGenerator(stream);
- stream.setName(lg.getElementTitle());
- stream.setDescription(lg.getElementDescription());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- } else if (desc instanceof ConsumableStreamPipesEntity) {
+ if (desc instanceof ConsumableStreamPipesEntity) {
Collection<TransportProtocol> supportedProtocols =
DeclarersSingleton.getInstance().getSupportedProtocols();
Collection<TransportFormat> supportedFormats =
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSourcePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSourcePipelineElementResource.java
deleted file mode 100644
index 1fd2c10..0000000
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSourcePipelineElementResource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.container.api;
-
-import org.apache.streampipes.container.assets.AssetZipGenerator;
-import org.apache.streampipes.container.declarer.DataSetDeclarer;
-import org.apache.streampipes.container.declarer.DataStreamDeclarer;
-import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
-import org.apache.streampipes.container.init.RunningDatasetInstances;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
-@Path("/sep")
-public class DataSourcePipelineElementResource extends AbstractPipelineElementResource<SemanticEventProducerDeclarer> {
-
- @Override
- protected Map<String, SemanticEventProducerDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getProducerDeclarers();
- }
-
- @GET
- @Path("{sourceId}/{streamId}")
- @Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response getDescription(@PathParam("sourceId") String sourceId, @PathParam("streamId") String streamId) {
- Optional<SpDataStream> stream = getStreamBySourceId(sourceId, streamId);
- if (stream.isPresent()) {
- return ok(prepareElement(stream.get(), getById(sourceId).getUri()));
- } else {
- return clientError();
- }
- }
-
- @GET
- @Path("{sourceId}/{streamId}/assets")
- @Produces("application/zip")
- public javax.ws.rs.core.Response getAssets(@PathParam("sourceId") String sourceId, @PathParam
- ("streamId") String streamId) {
- try {
- return ok(new AssetZipGenerator(streamId,
- getStreamBySourceId(sourceId, streamId).get()
- .getIncludedAssets()).makeZip());
- } catch (IOException e) {
- e.printStackTrace();
- return javax.ws.rs.core.Response.status(500).build();
- }
- }
-
- private Optional<SpDataStream> getStreamBySourceId(String sourceId, String streamId) {
- DataSourceDescription dataSourceDescription = (DataSourceDescription) getById(sourceId);
- return dataSourceDescription.getSpDataStreams().stream().filter(ds -> ds.getElementId().equals(streamId))
- .findFirst();
- }
-
- @POST
- @Path("{sourceId}/{streamId}")
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response invokeRuntime(@PathParam("sourceId") String sourceId, @PathParam("streamId") String streamId, String
- payload) {
- SemanticEventProducerDeclarer declarer = getDeclarerById(sourceId);
-
- Optional<DataStreamDeclarer> streamDeclarer = declarer
- .getEventStreams()
- .stream()
- .filter(sd -> sd.declareModel(declarer
- .declareModel())
- .getElementId()
- .equals(streamId))
- .findFirst();
-
- if (streamDeclarer.isPresent()) {
- try {
- SpDataSet dataSet = JacksonSerializer.getObjectMapper().readValue(payload, SpDataSet.class);
- String runningInstanceId = dataSet.getDatasetInvocationId();
- RunningDatasetInstances.INSTANCE.add(runningInstanceId, dataSet, (DataSetDeclarer) streamDeclarer.get().getClass().newInstance());
- RunningDatasetInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(dataSet, ()
- -> {
- // TODO notify
- });
- return ok(new Response(runningInstanceId, true));
- } catch (IOException | InstantiationException |
- IllegalAccessException e) {
- e.printStackTrace();
- return ok(new Response("", false, e.getMessage()));
- }
- }
- return ok(new Response("", false, "Could not find the element with id: " + ""));
-
- }
-
- @DELETE
- @Path("{sourceId}/{streamId}/{runningInstanceId}")
- @Produces(MediaType.APPLICATION_JSON)
- public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
-
- DataSetDeclarer runningInstance = RunningDatasetInstances.INSTANCE.getInvocation(runningInstanceId);
-
- if (runningInstance != null) {
- boolean detachSuccess = runningInstance.detachRuntime(runningInstanceId);
- Response resp = new Response("", detachSuccess);
- if (resp.isSuccess()) {
- RunningDatasetInstances.INSTANCE.remove(runningInstanceId);
- }
-
- return ok(resp);
- }
-
- return ok(new Response(runningInstanceId, false, "Could not find the running instance with id: " + runningInstanceId));
- }
-}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
new file mode 100644
index 0000000..42d3032
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.container.api;
+
+import org.apache.streampipes.container.assets.AssetZipGenerator;
+import org.apache.streampipes.container.declarer.DataSetDeclarer;
+import org.apache.streampipes.container.declarer.DataStreamDeclarer;
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.init.RunningDatasetInstances;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+@Path("/stream")
+public class DataStreamPipelineElementResource extends AbstractPipelineElementResource<DataStreamDeclarer> {
+
+ @Override
+ protected Map<String, DataStreamDeclarer> getElementDeclarers() {
+ return DeclarersSingleton.getInstance().getStreamDeclarers();
+ }
+
+ @GET
+ @Path("{streamId}/assets")
+ @Produces("application/zip")
+ public javax.ws.rs.core.Response getAssets(@PathParam("streamId") String streamId) {
+ try {
+ return ok(new AssetZipGenerator(streamId,
+ getById(streamId)
+ .getIncludedAssets()).makeZip());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return javax.ws.rs.core.Response.status(500).build();
+ }
+ }
+
+ @POST
+ @Path("{streamId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response invokeRuntime(@PathParam("streamId") String streamId, String
+ payload) {
+ DataStreamDeclarer streamDeclarer = getDeclarerById(streamId);
+
+ try {
+ SpDataSet dataSet = JacksonSerializer.getObjectMapper().readValue(payload, SpDataSet.class);
+ String runningInstanceId = dataSet.getDatasetInvocationId();
+ RunningDatasetInstances.INSTANCE.add(runningInstanceId, dataSet, (DataSetDeclarer) streamDeclarer.getClass().newInstance());
+ RunningDatasetInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(dataSet, ()
+ -> {
+ // TODO notify
+ });
+ return ok(new Response(runningInstanceId, true));
+ } catch (IOException | InstantiationException |
+ IllegalAccessException e) {
+ e.printStackTrace();
+ return ok(new Response("", false, e.getMessage()));
+ }
+
+ //return ok(new Response("", false, "Could not find the element with id: " + ""));
+
+ }
+
+ @DELETE
+ @Path("/{streamId}/{runningInstanceId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
+
+ DataSetDeclarer runningInstance = RunningDatasetInstances.INSTANCE.getInvocation(runningInstanceId);
+
+ if (runningInstance != null) {
+ boolean detachSuccess = runningInstance.detachRuntime(runningInstanceId);
+ Response resp = new Response("", detachSuccess);
+ if (resp.isSuccess()) {
+ RunningDatasetInstances.INSTANCE.remove(runningInstanceId);
+ }
+
+ return ok(resp);
+ }
+
+ return ok(new Response(runningInstanceId, false, "Could not find the running instance with id: " + runningInstanceId));
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/DataStreamDeclarer.java b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/DataStreamDeclarer.java
index d66d406..9993d4e 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/DataStreamDeclarer.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/DataStreamDeclarer.java
@@ -18,11 +18,10 @@
package org.apache.streampipes.container.declarer;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-public interface DataStreamDeclarer {
+public interface DataStreamDeclarer extends Declarer<SpDataStream> {
- SpDataStream declareModel(DataSourceDescription sep);
+ SpDataStream declareModel();
void executeStream();
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProducerDeclarer.java b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProducerDeclarer.java
index 89214c4..200463c 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProducerDeclarer.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/declarer/SemanticEventProducerDeclarer.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.model.graph.DataSourceDescription;
import java.util.List;
+@Deprecated
public interface SemanticEventProducerDeclarer extends Declarer<DataSourceDescription> {
List<DataStreamDeclarer> getEventStreams();
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/JSONGenerator.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/JSONGenerator.java
index 5190ef7..0841209 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/JSONGenerator.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/JSONGenerator.java
@@ -59,6 +59,7 @@ public class JSONGenerator {
obj.add("name", new JsonPrimitive(d.getName()));
obj.add("description", new JsonPrimitive(d.getDescription()));
obj.add("type", new JsonPrimitive(d.getType()));
+ obj.add("editable", new JsonPrimitive(d.isEditable()));
return obj;
}
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/model/Description.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/model/Description.java
index 09af426..bf76716 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/model/Description.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/model/Description.java
@@ -27,6 +27,7 @@ public class Description {
private URI uri;
private String type;
private String appId;
+ private boolean editable;
public Description(String name, String description, URI uri)
{
@@ -78,6 +79,14 @@ public class Description {
this.appId = appId;
}
+ public boolean isEditable() {
+ return editable;
+ }
+
+ public void setEditable(boolean editable) {
+ this.editable = editable;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventConsumerWelcomePage.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventConsumerWelcomePage.java
index e5623e4..c21aa75 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventConsumerWelcomePage.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventConsumerWelcomePage.java
@@ -1,49 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.container.html.page;
-
-import java.net.URI;
-import java.util.List;
-
-import org.apache.streampipes.container.declarer.SemanticEventConsumerDeclarer;
-import org.apache.streampipes.container.html.model.AgentDescription;
-import org.apache.streampipes.container.html.model.Description;
-
-@Deprecated
-public class EventConsumerWelcomePage extends WelcomePageGenerator<SemanticEventConsumerDeclarer>{
-
-
- public EventConsumerWelcomePage(String baseUri, List<SemanticEventConsumerDeclarer> declarers)
- {
- super(baseUri, declarers);
- }
-
- @Override
- public List<Description> buildUris() {
- for(SemanticEventConsumerDeclarer declarer : declarers)
- {
- Description producer = new AgentDescription();
- producer.setName(declarer.declareModel().getName());
- producer.setDescription(declarer.declareModel().getDescription());
- producer.setUri(URI.create(baseUri +declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
- descriptions.add(producer);
- }
- return descriptions;
- }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// *
+// */
+//
+//package org.apache.streampipes.container.html.page;
+//
+//import java.net.URI;
+//import java.util.List;
+//
+//import org.apache.streampipes.container.declarer.SemanticEventConsumerDeclarer;
+//import org.apache.streampipes.container.html.model.AgentDescription;
+//import org.apache.streampipes.container.html.model.Description;
+//
+//@Deprecated
+//public class EventConsumerWelcomePage extends WelcomePageGenerator<SemanticEventConsumerDeclarer>{
+//
+//
+// public EventConsumerWelcomePage(String baseUri, List<SemanticEventConsumerDeclarer> declarers)
+// {
+// super(baseUri, declarers);
+// }
+//
+// @Override
+// public List<Description> buildUris() {
+// for(SemanticEventConsumerDeclarer declarer : declarers)
+// {
+// Description producer = new AgentDescription();
+// producer.setName(declarer.declareModel().getName());
+// producer.setDescription(declarer.declareModel().getDescription());
+// producer.setUri(URI.create(baseUri +declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
+// descriptions.add(producer);
+// }
+// return descriptions;
+// }
+//}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProcessingAgentWelcomePage.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProcessingAgentWelcomePage.java
index 60005dd..e127d3c 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProcessingAgentWelcomePage.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProcessingAgentWelcomePage.java
@@ -1,49 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.container.html.page;
-
-import java.net.URI;
-import java.util.List;
-
-import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
-import org.apache.streampipes.container.html.model.AgentDescription;
-import org.apache.streampipes.container.html.model.Description;
-
-@Deprecated
-public class EventProcessingAgentWelcomePage extends WelcomePageGenerator<SemanticEventProcessingAgentDeclarer> {
-
- public EventProcessingAgentWelcomePage(String baseUri, List<SemanticEventProcessingAgentDeclarer> declarers)
- {
- super(baseUri, declarers);
- }
-
- @Override
- public List<Description> buildUris() {
- for(SemanticEventProcessingAgentDeclarer declarer : declarers)
- {
- AgentDescription producer = new AgentDescription();
- producer.setName(declarer.declareModel().getName());
- producer.setDescription(declarer.declareModel().getDescription());
- producer.setUri(URI.create(baseUri +declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
- descriptions.add(producer);
- }
- return descriptions;
- }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// *
+// */
+//
+//package org.apache.streampipes.container.html.page;
+//
+//import java.net.URI;
+//import java.util.List;
+//
+//import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
+//import org.apache.streampipes.container.html.model.AgentDescription;
+//import org.apache.streampipes.container.html.model.Description;
+//
+//@Deprecated
+//public class EventProcessingAgentWelcomePage extends WelcomePageGenerator<SemanticEventProcessingAgentDeclarer> {
+//
+// public EventProcessingAgentWelcomePage(String baseUri, List<SemanticEventProcessingAgentDeclarer> declarers)
+// {
+// super(baseUri, declarers);
+// }
+//
+// @Override
+// public List<Description> buildUris() {
+// for(SemanticEventProcessingAgentDeclarer declarer : declarers)
+// {
+// AgentDescription producer = new AgentDescription();
+// producer.setName(declarer.declareModel().getName());
+// producer.setDescription(declarer.declareModel().getDescription());
+// producer.setUri(URI.create(baseUri +declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
+// descriptions.add(producer);
+// }
+// return descriptions;
+// }
+//
+//}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProducerWelcomePage.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProducerWelcomePage.java
index 49472ed..645db26 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProducerWelcomePage.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/EventProducerWelcomePage.java
@@ -1,63 +1,63 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.container.html.page;
-
-import org.apache.streampipes.container.declarer.DataStreamDeclarer;
-import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
-import org.apache.streampipes.container.html.model.AgentDescription;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
-import org.apache.streampipes.container.html.model.Description;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-@Deprecated
-public class EventProducerWelcomePage extends WelcomePageGenerator<SemanticEventProducerDeclarer> {
-
- public EventProducerWelcomePage(String baseUri, List<SemanticEventProducerDeclarer> declarers)
- {
- super(baseUri, declarers);
- }
-
- @Override
- public List<Description> buildUris()
- {
- for(SemanticEventProducerDeclarer declarer : declarers)
- {
- List<Description> streams = new ArrayList<>();
- DataSourceDescriptionHtml description = new DataSourceDescriptionHtml();
- description.setName(declarer.declareModel().getName());
- description.setDescription(declarer.declareModel().getDescription());
- description.setUri(URI.create(baseUri + declarer.declareModel().getUri()));
- for(DataStreamDeclarer streamDeclarer : declarer.getEventStreams())
- {
- AgentDescription ad = new AgentDescription();
- ad.setDescription(streamDeclarer.declareModel(declarer.declareModel()).getDescription());
- ad.setUri(URI.create(baseUri + streamDeclarer.declareModel(declarer.declareModel()).getUri()));
- ad.setName(streamDeclarer.declareModel(declarer.declareModel()).getName());
- streams.add(ad);
- }
- description.setStreams(streams);
- descriptions.add(description);
- }
- return descriptions;
- }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// *
+// */
+//
+//package org.apache.streampipes.container.html.page;
+//
+//import org.apache.streampipes.container.declarer.DataStreamDeclarer;
+//import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
+//import org.apache.streampipes.container.html.model.AgentDescription;
+//import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
+//import org.apache.streampipes.container.html.model.Description;
+//
+//import java.net.URI;
+//import java.util.ArrayList;
+//import java.util.List;
+//
+//@Deprecated
+//public class EventProducerWelcomePage extends WelcomePageGenerator<SemanticEventProducerDeclarer> {
+//
+// public EventProducerWelcomePage(String baseUri, List<SemanticEventProducerDeclarer> declarers)
+// {
+// super(baseUri, declarers);
+// }
+//
+// @Override
+// public List<Description> buildUris()
+// {
+// for(SemanticEventProducerDeclarer declarer : declarers)
+// {
+// List<Description> streams = new ArrayList<>();
+// DataSourceDescriptionHtml description = new DataSourceDescriptionHtml();
+// description.setName(declarer.declareModel().getName());
+// description.setDescription(declarer.declareModel().getDescription());
+// description.setUri(URI.create(baseUri + declarer.declareModel().getUri()));
+// for(DataStreamDeclarer streamDeclarer : declarer.getEventStreams())
+// {
+// AgentDescription ad = new AgentDescription();
+// ad.setDescription(streamDeclarer.declareModel(declarer.declareModel()).getDescription());
+// ad.setUri(URI.create(baseUri + streamDeclarer.declareModel(declarer.declareModel()).getUri()));
+// ad.setName(streamDeclarer.declareModel(declarer.declareModel()).getName());
+// streams.add(ad);
+// }
+// description.setStreams(streams);
+// descriptions.add(description);
+// }
+// return descriptions;
+// }
+//
+//}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGeneratorImpl.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGeneratorImpl.java
index 25eea2d..1db76b8 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGeneratorImpl.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGeneratorImpl.java
@@ -19,9 +19,10 @@
package org.apache.streampipes.container.html.page;
import org.apache.streampipes.container.declarer.*;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
import org.apache.streampipes.container.html.model.Description;
import org.apache.streampipes.container.locales.LabelGenerator;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -31,82 +32,74 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-public class WelcomePageGeneratorImpl extends WelcomePageGenerator<Declarer> {
+public class WelcomePageGeneratorImpl extends WelcomePageGenerator<Declarer<?>> {
- public WelcomePageGeneratorImpl(String baseUri, Collection<Declarer> declarers) {
- super(baseUri, declarers);
- }
-
- @Override
- public List<Description> buildUris() {
- List<Description> descriptions = new ArrayList<>();
+ public WelcomePageGeneratorImpl(String baseUri, Collection<Declarer<?>> declarers) {
+ super(baseUri, declarers);
+ }
- for (Declarer declarer : declarers) {
- if (declarer instanceof InvocableDeclarer) {
- descriptions.add(getDescription((InvocableDeclarer) declarer));
- } else if (declarer instanceof SemanticEventProducerDeclarer) {
- descriptions.add(getDescription((SemanticEventProducerDeclarer) declarer));
- } else if (declarer instanceof PipelineTemplateDeclarer) {
- descriptions.add(getDescription(declarer));
- }
- }
- return descriptions;
- }
+ @Override
+ public List<Description> buildUris() {
+ List<Description> descriptions = new ArrayList<>();
- private Description getDescription(Declarer declarer) {
- Description desc = new Description();
- // TODO remove after full internationalization support has been implemented
- updateLabel(declarer.declareModel(), desc);
- desc.setType(getType(declarer));
- desc.setAppId(declarer.declareModel().getAppId());
- String uri = baseUri;
- if (declarer instanceof SemanticEventConsumerDeclarer) {
- uri += "sec/";
- } else if (declarer instanceof SemanticEventProcessingAgentDeclarer) {
- uri += "sepa/";
- } else if (declarer instanceof PipelineTemplateDeclarer) {
- uri += "template/";
- }
- desc.setUri(URI.create(uri +declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
- return desc;
+ for (Declarer<?> declarer : declarers) {
+ if (declarer instanceof InvocableDeclarer) {
+ descriptions.add(getDescription(declarer));
+ } else if (declarer instanceof DataStreamDeclarer) {
+ descriptions.add(getDescription(declarer));
+ } else if (declarer instanceof PipelineTemplateDeclarer) {
+ descriptions.add(getDescription(declarer));
+ }
}
+ return descriptions;
+ }
- private String getType(Declarer declarer) {
- if (declarer.declareModel() instanceof DataSinkDescription) return "action";
- else return "sepa";
+ private Description getDescription(Declarer<?> declarer) {
+ Description desc = new Description();
+ // TODO remove after full internationalization support has been implemented
+ updateLabel(declarer.declareModel(), desc);
+ desc.setType(getType(declarer));
+ desc.setAppId(declarer.declareModel().getAppId());
+ desc.setEditable(!(declarer.declareModel().isInternallyManaged()));
+ String uri = baseUri;
+ if (declarer instanceof SemanticEventConsumerDeclarer) {
+ uri += "sec/";
+ } else if (declarer instanceof SemanticEventProcessingAgentDeclarer) {
+ uri += "sepa/";
+ } else if (declarer instanceof DataStreamDeclarer) {
+ uri += "stream/";
+ } else if (declarer instanceof PipelineTemplateDeclarer) {
+ uri += "template/";
}
+ desc.setUri(URI.create(uri + declarer.declareModel().getUri().replaceFirst("[a-zA-Z]{4}://[a-zA-Z\\.]+:\\d+/", "")));
+ return desc;
+ }
- private Description getDescription(SemanticEventProducerDeclarer declarer) {
- List<Description> streams = new ArrayList<>();
- DataSourceDescriptionHtml desc = new DataSourceDescriptionHtml();
- updateLabel(declarer.declareModel(), desc);
- desc.setUri(URI.create(baseUri + "sep/" + declarer.declareModel().getUri()));
- desc.setType("source");
- desc.setAppId(declarer.declareModel().getAppId());
- for (DataStreamDeclarer streamDeclarer : declarer.getEventStreams()) {
- Description ad = new Description();
- updateLabel(streamDeclarer.declareModel(declarer.declareModel()), ad);
- ad.setUri(URI.create(baseUri +"stream/" + streamDeclarer.declareModel(declarer.declareModel()).getUri()));
- ad.setType("stream");
- streams.add(ad);
- }
- desc.setStreams(streams);
- return desc;
+ private String getType(Declarer<?> declarer) {
+ if (declarer.declareModel() instanceof DataSinkDescription) {
+ return "action";
+ } else if (declarer.declareModel() instanceof SpDataSet) {
+ return "set";
+ } else if (declarer.declareModel() instanceof SpDataStream) {
+ return "stream";
+ } else {
+ return "sepa";
}
+ }
- private void updateLabel(NamedStreamPipesEntity entity, Description desc) {
- if (!entity.isIncludesLocales()) {
- desc.setName(entity.getName());
- desc.setDescription(entity.getDescription());
- } else {
- LabelGenerator lg = new LabelGenerator(entity);
- try {
- desc.setName(lg.getElementTitle());
- desc.setDescription(lg.getElementDescription());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
+ private void updateLabel(NamedStreamPipesEntity entity, Description desc) {
+ if (!entity.isIncludesLocales()) {
+ desc.setName(entity.getName());
+ desc.setDescription(entity.getDescription());
+ } else {
+ LabelGenerator lg = new LabelGenerator(entity);
+ try {
+ desc.setName(lg.getElementTitle());
+ desc.setDescription(lg.getElementDescription());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
+ }
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
index 3ac7050..4befd89 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/init/DeclarersSingleton.java
@@ -41,7 +41,6 @@ public class DeclarersSingleton {
private static final String Slash = "/";
private Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers;
- private Map<String, SemanticEventProducerDeclarer> producerDeclarers;
private Map<String, SemanticEventConsumerDeclarer> consumerDeclarers;
private Map<String, PipelineTemplateDeclarer> pipelineTemplateDeclarers;
private Map<String, DataStreamDeclarer> streamDeclarers;
@@ -56,7 +55,6 @@ public class DeclarersSingleton {
private DeclarersSingleton() {
this.epaDeclarers = new HashMap<>();
- this.producerDeclarers = new HashMap<>();
this.consumerDeclarers = new HashMap<>();
this.streamDeclarers = new HashMap<>();
this.pipelineTemplateDeclarers = new HashMap<>();
@@ -81,8 +79,8 @@ public class DeclarersSingleton {
public DeclarersSingleton add(Declarer d) {
if (d instanceof SemanticEventProcessingAgentDeclarer) {
addEpaDeclarer((SemanticEventProcessingAgentDeclarer) d);
- } else if (d instanceof SemanticEventProducerDeclarer) {
- addProducerDeclarer((SemanticEventProducerDeclarer) d);
+ } else if (d instanceof DataStreamDeclarer) {
+ addStreamDeclarer((DataStreamDeclarer) d);
} else if (d instanceof SemanticEventConsumerDeclarer) {
addConsumerDeclarer((SemanticEventConsumerDeclarer) d);
} else if (d instanceof PipelineTemplateDeclarer) {
@@ -92,10 +90,10 @@ public class DeclarersSingleton {
return getInstance();
}
- public Map<String, Declarer> getDeclarers() {
- Map<String, Declarer> result = new HashMap<>();
+ public Map<String, Declarer<?>> getDeclarers() {
+ Map<String, Declarer<?>> result = new HashMap<>();
result.putAll(epaDeclarers);
- result.putAll(producerDeclarers);
+ result.putAll(streamDeclarers);
result.putAll(consumerDeclarers);
result.putAll(pipelineTemplateDeclarers);
return result;
@@ -146,11 +144,9 @@ public class DeclarersSingleton {
epaDeclarers.put(epaDeclarer.declareModel().getAppId(), epaDeclarer);
}
- private void addProducerDeclarer(SemanticEventProducerDeclarer sourceDeclarer) {
- checkAndStartExecutableStreams(sourceDeclarer);
- producerDeclarers.put(sourceDeclarer.declareModel().getAppId(), sourceDeclarer);
- sourceDeclarer.getEventStreams().forEach(sd ->
- streamDeclarers.put(sd.declareModel(sourceDeclarer.declareModel()).getAppId(), sd));
+ private void addStreamDeclarer(DataStreamDeclarer streamDeclarer) {
+ streamDeclarers.put(streamDeclarer.declareModel().getAppId(), streamDeclarer);
+ checkAndStartExecutableStreams(streamDeclarer);
}
private void addConsumerDeclarer(SemanticEventConsumerDeclarer consumerDeclarer) {
@@ -166,8 +162,8 @@ public class DeclarersSingleton {
return epaDeclarers;
}
- public Map<String, SemanticEventProducerDeclarer> getProducerDeclarers() {
- return producerDeclarers;
+ public Map<String, DataStreamDeclarer> getStreamDeclarers() {
+ return streamDeclarers;
}
public Map<String, SemanticEventConsumerDeclarer> getConsumerDeclarers() {
@@ -193,10 +189,6 @@ public class DeclarersSingleton {
.collect(Collectors.toList());
}
- public Map<String, DataStreamDeclarer> getStreamDeclarers() {
- return streamDeclarers;
- }
-
public void setPort(int port) {
this.port = port;
}
@@ -213,13 +205,9 @@ public class DeclarersSingleton {
return Http + hostName + Colon + port + route;
}
- private void checkAndStartExecutableStreams(SemanticEventProducerDeclarer sourceDeclarer) {
- sourceDeclarer.getEventStreams()
- .stream()
- .filter(DataStreamDeclarer::isExecutable)
- .forEach(es -> {
- es.declareModel(sourceDeclarer.declareModel());
- es.executeStream();
- });
+ private void checkAndStartExecutableStreams(DataStreamDeclarer declarer) {
+ if (declarer.isExecutable()) {
+ declarer.executeStream();
+ }
}
}
diff --git a/streampipes-container/src/test/java/de/fzi/cep/sepa/html/page/WelcomePageGeneratorImplTest.java b/streampipes-container/src/test/java/de/fzi/cep/sepa/html/page/WelcomePageGeneratorImplTest.java
deleted file mode 100644
index 05ff3c4..0000000
--- a/streampipes-container/src/test/java/de/fzi/cep/sepa/html/page/WelcomePageGeneratorImplTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package de.fzi.cep.sepa.html.page;
-
-import org.junit.Test;
-import org.apache.streampipes.container.declarer.DataStreamDeclarer;
-import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
-import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
-import org.apache.streampipes.container.html.model.Description;
-import org.apache.streampipes.container.html.page.WelcomePageGenerator;
-import org.apache.streampipes.container.html.page.WelcomePageGeneratorImpl;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-public class WelcomePageGeneratorImplTest {
-
- @Test
- public void buildUrisWithEmptyListTest() {
- WelcomePageGenerator wpg = new WelcomePageGeneratorImpl("baseUri", new ArrayList<>());
- List<Description> actual = wpg.buildUris();
-
- assertEquals(actual.size(), 0);
- }
-
- @Test
- public void buildUrisWithSepaTest() {
- WelcomePageGenerator wpg = new WelcomePageGeneratorImpl("baseUri/", Arrays.asList(getSepaDeclarer()));
- List<Description> actual = wpg.buildUris();
- Description expected = new Description("sepaname", "sepadescription", URI.create("baseUri/sepa/sepapathName"));
-
- assertEquals(1, actual.size());
- assertEquals(expected, actual.get(0));
- }
-
- @Test
- public void buildUrisWithSepTest() {
- WelcomePageGenerator wpg = new WelcomePageGeneratorImpl("baseUri/", Arrays.asList(getSepdDeclarer()));
- List<Description> actual = wpg.buildUris();
- Description expected = new Description("sepname", "sepdescription", URI.create("baseUri/sep/seppathName"));
-
- assertEquals(actual.size(), 1);
- Description desc = actual.get(0);
- assertEquals(expected.getName(), desc.getName());
- assertEquals(expected.getDescription(), desc.getDescription());
- assertEquals(expected.getUri(), desc.getUri());
-
- assertThat(desc, instanceOf(DataSourceDescriptionHtml.class));
-
- DataSourceDescriptionHtml sepDesc = (DataSourceDescriptionHtml) desc;
- assertEquals(1, sepDesc.getStreams().size());
- Description expectedStream = new Description("streamname", "streamdescription", URI.create("baseUri/stream/streampathName"));
-
- assertEquals(expectedStream, sepDesc.getStreams().get(0));
- }
-
- private SemanticEventProcessingAgentDeclarer getSepaDeclarer() {
- return new SemanticEventProcessingAgentDeclarer() {
- @Override
- public Response invokeRuntime(DataProcessorInvocation invocationGraph) {
- return null;
- }
-
- @Override
- public Response detachRuntime(String pipelineId) {
- return null;
- }
-
- @Override
- public DataProcessorDescription declareModel() {
- return new DataProcessorDescription("sepapathName", "sepaname", "sepadescription", "iconUrl");
- }
- };
- }
-
- private SemanticEventProducerDeclarer getSepdDeclarer() {
- return new SemanticEventProducerDeclarer() {
- @Override
- public List<DataStreamDeclarer> getEventStreams() {
- return Arrays.asList(new DataStreamDeclarer() {
- @Override
- public SpDataStream declareModel(DataSourceDescription sep) {
- return new SpDataStream("streampathName", "streamname", "streamdescription", null);
- }
-
- @Override
- public void executeStream() {
-
- }
-
- @Override
- public boolean isExecutable() {
- return false;
- }
- });
- }
-
- @Override
- public DataSourceDescription declareModel() {
- return new DataSourceDescription("seppathName", "sepname", "sepdescription", "sepiconUrl");
- }
- };
- }
-
-}
\ No newline at end of file
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
index 8fa1b50..a962d46 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
+++ b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
@@ -26,9 +26,11 @@ public class RdfEndpointItem {
private String description;
private String uri;
private String type;
- private boolean installed;
private String appId;
+ private boolean installed;
+ private boolean editable;
+
private List<RdfEndpointItem> streams;
public RdfEndpointItem() {
@@ -90,4 +92,12 @@ public class RdfEndpointItem {
public void setAppId(String appId) {
this.appId = appId;
}
+
+ public boolean isEditable() {
+ return editable;
+ }
+
+ public void setEditable(boolean editable) {
+ this.editable = editable;
+ }
}
diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml
index 85eadf4..5f50cd6 100644
--- a/streampipes-model/pom.xml
+++ b/streampipes-model/pom.xml
@@ -90,7 +90,7 @@
<plugin>
<groupId>cz.habarta.typescript-generator</groupId>
<artifactId>typescript-generator-maven-plugin</artifactId>
- <version>2.24.612</version>
+ <version>2.27.744</version>
<executions>
<execution>
<id>generate</id>
@@ -109,7 +109,7 @@
<annotation>org.apache.streampipes.model.shared.annotation.TsIgnore</annotation>
</excludePropertyAnnotations>
<excludeClasses>
- <class>io.fogsy.empire.core.empire.SupportsRdfId</class>
+ <class>io.fogsy.empire.api.SupportsRdfId</class>
<class>java.io.Serializable</class>
</excludeClasses>
<customTypeMappings>
@@ -130,4 +130,4 @@
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStream.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStream.java
index f6c09b7..5e3b483 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStream.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStream.java
@@ -79,6 +79,9 @@ public class SpDataStream extends NamedStreamPipesEntity {
@RdfProperty(StreamPipes.INDEX)
private int index;
+ @RdfProperty(StreamPipes.HAS_CORRESPONDING_ADAPTER_ID)
+ private String correspondingAdapterId;
+
protected List<String> category;
public SpDataStream(String uri, String name, String description, String iconUrl, List<EventStreamQualityDefinition> hasEventStreamQualities,
@@ -103,6 +106,7 @@ public class SpDataStream extends NamedStreamPipesEntity {
public SpDataStream(SpDataStream other) {
super(other);
this.index = other.getIndex();
+ this.correspondingAdapterId = other.getCorrespondingAdapterId();
if (other.getEventGrounding() != null) {
this.eventGrounding = new EventGrounding(other.getEventGrounding());
}
@@ -190,4 +194,12 @@ public class SpDataStream extends NamedStreamPipesEntity {
public void setIndex(int index) {
this.index = index;
}
+
+ public String getCorrespondingAdapterId() {
+ return correspondingAdapterId;
+ }
+
+ public void setCorrespondingAdapterId(String correspondingAdapterId) {
+ this.correspondingAdapterId = correspondingAdapterId;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/AbstractStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/AbstractStreamPipesEntity.java
index a392f10..2b06748 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/AbstractStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/AbstractStreamPipesEntity.java
@@ -25,6 +25,7 @@ import io.fogsy.empire.annotations.Namespaces;
import io.fogsy.empire.annotations.RdfId;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.api.SupportsRdfId;
+import org.apache.streampipes.model.shared.annotation.TsIgnore;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.vocabulary.*;
@@ -56,6 +57,7 @@ public class AbstractStreamPipesEntity implements SupportsRdfId, Serializable {
@SuppressWarnings("rawtypes")
@Override
@JsonIgnore
+ @TsIgnore
public RdfKey getRdfId() {
return new URIKey(URI.create(getElementId()));
}
@@ -63,6 +65,7 @@ public class AbstractStreamPipesEntity implements SupportsRdfId, Serializable {
@SuppressWarnings("rawtypes")
@Override
@JsonIgnore
+ @TsIgnore
public void setRdfId(RdfKey arg0) {
this.elementId = arg0.toString();
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
index 24e35ee..0e7a30e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
@@ -71,6 +71,9 @@ public abstract class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
@RdfProperty(StreamPipes.HAS_APPLICATION_LINK)
private List<ApplicationLink> applicationLinks;
+ @RdfProperty(StreamPipes.IS_INTERNALLY_MANAGED)
+ private boolean internallyManaged;
+
protected String DOM;
protected List<String> connectedTo;
@@ -109,6 +112,7 @@ public abstract class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
this.iconUrl = other.getIconUrl();
this.elementId = other.getElementId();
this.DOM = other.getDOM();
+ this.internallyManaged = other.isInternallyManaged();
this.connectedTo = other.getConnectedTo();
if (other.getApplicationLinks() != null) {
this.applicationLinks = new Cloner().al(other.getApplicationLinks());
@@ -222,4 +226,11 @@ public abstract class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
this.includedLocales = includedLocales;
}
+ public boolean isInternallyManaged() {
+ return internallyManaged;
+ }
+
+ public void setInternallyManaged(boolean internallyManaged) {
+ this.internallyManaged = internallyManaged;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterStreamDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterStreamDescription.java
index 2dd8de0..4293e5e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterStreamDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterStreamDescription.java
@@ -49,7 +49,9 @@ public abstract class AdapterStreamDescription extends AdapterDescription {
public AdapterStreamDescription(AdapterStreamDescription other) {
super(other);
- if (other.getDataStream() != null) this.setDataStream(new SpDataStream(other.getDataStream()));
+ if (other.getDataStream() != null) {
+ this.dataStream = new SpDataStream(other.getDataStream());
+ }
}
public SpDataStream getDataStream() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSourceDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSourceDescription.java
index 86aa075..a37db0d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSourceDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSourceDescription.java
@@ -40,6 +40,7 @@ import java.util.List;
@RdfsClass(StreamPipes.DATA_SOURCE_DESCRIPTION)
@Entity
@TsModel
+@Deprecated
public class DataSourceDescription extends NamedStreamPipesEntity {
private static final long serialVersionUID = 5607030219013954697L;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index c1f5e5b..680d5bc 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -180,11 +180,11 @@ public class Cloner {
}
public List<SpDataStream> seq(List<SpDataStream> spDataStreams) {
- return spDataStreams.stream().map(s -> mapSequence(s)).collect(Collectors.toList());
+ return spDataStreams.stream().map(this::mapSequence).collect(Collectors.toList());
}
public List<SpDataStream> streams(List<SpDataStream> spDataStreams) {
- return spDataStreams.stream().map(s -> new SpDataStream(s)).collect(Collectors.toList());
+ return spDataStreams.stream().map(this::mapSequence).collect(Collectors.toList());
}
public SpDataStream mapSequence(SpDataStream seq) {
@@ -201,7 +201,7 @@ public class Cloner {
public List<OutputStrategy> strategies(List<OutputStrategy> outputStrategies) {
if (outputStrategies != null) {
- return outputStrategies.stream().map(o -> outputStrategy(o)).collect(Collectors.toList());
+ return outputStrategies.stream().map(this::outputStrategy).collect(Collectors.toList());
} else {
return new ArrayList<>();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..2f21a55 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -20,7 +20,6 @@ package org.apache.streampipes.manager.execution.http;
import org.lightcouch.DocumentConflictException;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -86,10 +85,6 @@ public class PipelineExecutor {
PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
- if (monitor) {
- SepMonitoringManager.addObserver(pipeline.getPipelineId());
- }
-
if (storeStatus) {
setPipelineStarted(pipeline);
}
@@ -150,10 +145,6 @@ public class PipelineExecutor {
PipelineStatusMessageType.PIPELINE_STOPPED.title(),
PipelineStatusMessageType.PIPELINE_STOPPED.description()));
- if (monitor) {
- SepMonitoringManager.removeObserver(pipeline.getPipelineId());
- }
-
}
return status;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/SepMonitoringManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/SepMonitoringManager.java
deleted file mode 100644
index f99b751..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/status/SepMonitoringManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.status;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.streampipes.manager.monitoring.runtime.PipelineObserver;
-import org.apache.streampipes.manager.monitoring.runtime.SepStoppedMonitoring;
-
-public class SepMonitoringManager {
-
- public static SepStoppedMonitoring SepMonitoring;
-
- private static Map<String, PipelineObserver> observers;
-
- static {
- SepMonitoring = new SepStoppedMonitoring();
- observers = new HashMap<>();
- Thread thread = new Thread(SepMonitoring);
- thread.start();
- }
-
- public static void addObserver(String pipelineId) {
- PipelineObserver observer = new PipelineObserver(pipelineId);
- observers.put(pipelineId, observer);
- SepMonitoring.register(observer);
- }
-
- public static void removeObserver(String pipelineId) {
- SepMonitoring.remove(observers.get(pipelineId));
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/JobManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/JobManager.java
deleted file mode 100644
index 4082b9c..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/JobManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.streampipes.storage.management.StorageManager;
-
-public enum JobManager {
-
- INSTANCE;
-
- private List<MonitoringJob<?>> currentJobs;
-
- private final ScheduledExecutorService scheduler;
-
- JobManager()
- {
- this.currentJobs = new ArrayList<>();
- this.scheduler = Executors.newScheduledThreadPool(2);
- }
-
- public void addJob(MonitoringJob<?> job)
- {
- currentJobs.add(job);
- scheduler.scheduleAtFixedRate(new MonitoringJobExecutor(job), 10, 10, TimeUnit.MINUTES);
- }
-
- public void removeJob(MonitoringJob<?> job)
- {
- currentJobs.remove(job);
- }
-
- public List<MonitoringJob<?>> getCurrentJobs()
- {
- return currentJobs;
- }
-
- public void prepareMonitoring() {
- StorageManager.INSTANCE.getPipelineElementStorage().getAllDataProcessors().forEach(s -> addJob(new SepaMonitoringJob(s)));
- StorageManager.INSTANCE.getPipelineElementStorage().getAllDataSinks().forEach(s -> addJob(new SecMonitoringJob(s)));
- //TODO: add seps StorageManager.INSTANCE.getStorageAPI().getAllSECs().forEach(s -> currentJobs.add(new SecMonitoringJob(s)));
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJob.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJob.java
deleted file mode 100644
index 7dbcfcb..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJob.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.streampipes.manager.monitoring.task.GetDescriptionTask;
-import org.apache.streampipes.manager.monitoring.task.TaskDefinition;
-import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
-import org.apache.streampipes.model.client.monitoring.TaskReport;
-
-public abstract class MonitoringJob<T extends ConsumableStreamPipesEntity> {
-
- protected T monitoredObject;
- protected List<TaskDefinition> tasks;
-
- protected long repeatAfter;
- protected boolean success;
-
- protected String elementId;
-
- public MonitoringJob(T monitoredObject, long repeatAfter)
- {
- this.monitoredObject = monitoredObject;
- this.tasks = new ArrayList<>();
- this.repeatAfter = repeatAfter;
- this.elementId = monitoredObject.getUri();
- }
-
- public MonitoringJob(T monitoredObject)
- {
- this.monitoredObject = monitoredObject;
- this.tasks = new ArrayList<>();
- this.repeatAfter = 6000;
- }
-
- protected void getDefinedTasks()
- {
- this.tasks.add(new GetDescriptionTask(monitoredObject));
- }
-
- public String getElementId() {
- return elementId;
- }
-
- public List<TaskReport> performJobExecution()
- {
- getDefinedTasks();
- List<TaskReport> reports = new ArrayList<>();
- this.tasks.forEach(t -> reports.add(t.execute()));
-
- this.success = reports.stream().anyMatch(r -> !r.isSuccess());
-
- return reports;
- }
-
- public boolean isJobExecutionSuccessful()
- {
- return success;
- }
-
- protected abstract void generateInvocableSepaElement();
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJobExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJobExecutor.java
deleted file mode 100644
index c62fcca..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringJobExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import com.google.gson.Gson;
-import org.apache.streampipes.model.client.monitoring.JobReport;
-import org.apache.streampipes.model.client.monitoring.TaskReport;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.Date;
-import java.util.List;
-
-public class MonitoringJobExecutor implements Runnable {
-
- MonitoringJob<?> job;
-
- public MonitoringJobExecutor(MonitoringJob<?> job) {
- this.job = job;
- }
-
- @Override
- public void run() {
- List<TaskReport> reports = job.performJobExecution();
- reports.forEach(r -> System.out.println(r.toString()));
- JobReport jobReport = new JobReport(job.getElementId(), new Date(), reports);
- StorageDispatcher.INSTANCE.getNoSqlStore().getMonitoringDataStorageApi().storeJobReport(jobReport);
- System.out.println(new Gson().toJson(jobReport));
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringUtils.java
deleted file mode 100644
index c227d3b..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/MonitoringUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.fluent.Request;
-
-public class MonitoringUtils {
-
- public static String randomKey()
- {
- return RandomStringUtils.randomAlphabetic(5);
- }
-
- public static HttpResponse getHttpResponse(String url) throws ClientProtocolException, IOException
- {
- return Request.Get(url).execute().returnResponse();
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SecMonitoringJob.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SecMonitoringJob.java
deleted file mode 100644
index 819b993..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SecMonitoringJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import org.apache.streampipes.model.graph.DataSinkDescription;
-
-public class SecMonitoringJob extends MonitoringJob<DataSinkDescription>{
-
- public SecMonitoringJob(DataSinkDescription dataSinkDescription) {
- super(dataSinkDescription);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void generateInvocableSepaElement() {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SepaMonitoringJob.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SepaMonitoringJob.java
deleted file mode 100644
index a825c08..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/job/SepaMonitoringJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.job;
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-
-public class SepaMonitoringJob extends MonitoringJob<DataProcessorDescription>{
-
- public SepaMonitoringJob(DataProcessorDescription monitoredObject) {
- super(monitoredObject);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void generateInvocableSepaElement() {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
deleted file mode 100644
index c6e991f..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EpRuntimeMonitoring.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EpRuntimeMonitoring.java
deleted file mode 100644
index 92e7f54..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EpRuntimeMonitoring.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-
-public interface EpRuntimeMonitoring<T> {
-
- boolean register(PipelineObserver observer);
- boolean remove(PipelineObserver observer);
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EventGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EventGenerator.java
deleted file mode 100644
index 4cdd2da..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/EventGenerator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.manager.monitoring.job.MonitoringUtils;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class EventGenerator {
-
- protected EventSchema schema;
- private FormatGenerator formatGenerator;
-
- public EventGenerator(EventSchema schema, FormatGenerator formatGenerator)
- {
- this.schema = schema;
- this.formatGenerator = formatGenerator;
- }
-
- public Object nextEvent()
- {
- return formatGenerator.makeOutputFormat(makeEvent(new HashMap<>(), schema.getEventProperties()));
- }
-
- protected Map<String, Object> makeEvent(Map<String, Object> map, List<EventProperty> properties)
- {
- for(EventProperty p : properties)
- {
- if (p instanceof EventPropertyPrimitive) map.put(randomKey(), makePrimitiveProperty((EventPropertyPrimitive) p));
- else if (p instanceof EventPropertyNested) map.put(randomKey(), makeNestedProperty((EventPropertyNested) p));
- else if (p instanceof EventPropertyList) map.put(randomKey(), makeListProperty((EventPropertyList) p));
- else throw new IllegalArgumentException("Wrong type detected");
- }
-
- return map;
- }
-
- private String randomKey()
- {
- return MonitoringUtils.randomKey();
- }
-
- protected abstract Map<String, Object> makeNestedProperty(EventPropertyNested nested);
-
- protected abstract Object makePrimitiveProperty(EventPropertyPrimitive primitive);
-
- protected abstract List<?> makeListProperty(EventPropertyList list);
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/FormatGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/FormatGenerator.java
deleted file mode 100644
index 23aa743..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/FormatGenerator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import java.util.Map;
-
-public interface FormatGenerator {
-
- Object makeOutputFormat(Map<String, Object> event);
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/JsonFormatGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/JsonFormatGenerator.java
deleted file mode 100644
index b59f798..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/JsonFormatGenerator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-public class JsonFormatGenerator {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineObserver.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineObserver.java
deleted file mode 100644
index 59e4639..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineObserver.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-
-public class PipelineObserver {
-
- private String pipelineId;
-
-
- public PipelineObserver(String pipelineId) {
- super();
- this.pipelineId = pipelineId;
- }
-
- public void update() {
- System.out.println(pipelineId + " was updated yeah!!");
- PipelineStatusManager.addPipelineStatus(pipelineId, makePipelineStatusMessage(PipelineStatusMessageType.PIPELINE_NO_DATA));
-
- SimilarStreamFinder streamFinder = new SimilarStreamFinder(pipelineId);
- if (streamFinder.isReplacable()) {
- System.out.println("Pipeline replacable");
-
- boolean success = new PipelineStreamReplacer(pipelineId, streamFinder.getSimilarStreams().get(0)).replaceStream();
- if (success) {
- System.out.println("success");
- PipelineStatusManager.addPipelineStatus(pipelineId, makePipelineStatusMessage(PipelineStatusMessageType.PIPELINE_EXCHANGE_SUCCESS));
- } else {
- System.out.println("failure");
- PipelineStatusManager.addPipelineStatus(pipelineId, makePipelineStatusMessage(PipelineStatusMessageType.PIPELINE_EXCHANGE_FAILURE));
- }
- } else {
- System.out.println("Pipeline not replacable");
- PipelineStatusManager.addPipelineStatus(pipelineId, makePipelineStatusMessage(PipelineStatusMessageType.PIPELINE_EXCHANGE_FAILURE));
- }
- }
-
- private PipelineStatusMessage makePipelineStatusMessage(PipelineStatusMessageType type) {
- return new PipelineStatusMessage(pipelineId, System.currentTimeMillis(), type.title(), type.description());
- }
-
- public String getPipelineId() {
- return pipelineId;
- }
- public void setPipelineId(String pipelineId) {
- this.pipelineId = pipelineId;
- }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((pipelineId == null) ? 0 : pipelineId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- PipelineObserver other = (PipelineObserver) obj;
- if (pipelineId == null) {
- if (other.pipelineId != null)
- return false;
- } else if (!pipelineId.equals(other.pipelineId))
- return false;
- return true;
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineStreamReplacer.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineStreamReplacer.java
deleted file mode 100644
index f0c34de..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/PipelineStreamReplacer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.storage.management.StorageManager;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-public class PipelineStreamReplacer {
-
- private String pipelineId;
- private SpDataStream streamToReplace;
-
- public PipelineStreamReplacer(String pipelineId, SpDataStream streamToReplace) {
- this.pipelineId = pipelineId;
- this.streamToReplace = streamToReplace;
- }
-
- public boolean replaceStream() {
- Pipeline currentPipeline = getPipelineStorage().getPipeline(pipelineId);
- String streamDomId = currentPipeline.getStreams().get(0).getDOM();
- Operations.stopPipeline(currentPipeline);
- currentPipeline = getPipelineStorage().getPipeline(pipelineId);
- try {
- streamToReplace.setDOM(streamDomId);
- currentPipeline.setStreams(Arrays.asList(streamToReplace));
-
- for(DataProcessorInvocation sepaClient : currentPipeline.getSepas()) {
- // TODO
-// for(StaticProperty staticProperty : sepaClient.getStaticProperties()) {
-// if (staticProperty.getType() == StaticPropertyType.CUSTOM_OUTPUT) {
-// CheckboxInput input = (CheckboxInput) staticProperty.getInput();
-// for(Option option : input.getOptions()) {
-// option.setElementId(getElementId(option.getHumanDescription()));
-// }
-// }
-// else if (staticProperty.getType() == StaticPropertyType.MAPPING_PROPERTY) {
-// SelectFormInput input = (SelectFormInput) staticProperty.getInput();
-// for(Option option : input.getOptions()) {
-// option.setElementId(getElementId(option.getHumanDescription()));
-// }
-// }
-// }
- }
- String newPipelineId = UUID.randomUUID().toString();
- currentPipeline.setPipelineId(newPipelineId);
- currentPipeline.setRev(null);
- currentPipeline.setName(currentPipeline.getName() +" (Replacement)");
- getPipelineStorage().storePipeline(currentPipeline);
- Operations.startPipeline(getPipelineStorage().getPipeline(newPipelineId));
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- Operations.startPipeline(getPipelineStorage().getPipeline(pipelineId));
- return false;
- }
-
- }
-
- private IPipelineStorage getPipelineStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
- }
-
- private String getElementId(String humanDescription) throws Exception {
- for(EventProperty p : streamToReplace.getEventSchema().getEventProperties()) {
- if (p.getRuntimeName().equals(humanDescription)) return p.getElementId();
- }
-
- throw new Exception("Property not found");
- }
-
- private DataSourceDescription getSep(SpDataStream streamToReplace2) throws Exception {
- List<DataSourceDescription> seps = StorageManager.INSTANCE.getPipelineElementStorage().getAllDataSources();
-
- for(DataSourceDescription sep : seps) {
- for(SpDataStream stream : sep.getSpDataStreams()) {
- if (stream.getElementId().equals(streamToReplace2.getElementId())) return sep;
- }
- }
-
- throw new Exception("Stream not found");
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ProtocolHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ProtocolHandler.java
deleted file mode 100644
index aaae3ba..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ProtocolHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.commons.messaging.IMessageListener;
-import org.apache.streampipes.commons.messaging.IMessagePublisher;
-
-public interface ProtocolHandler {
-
- IMessagePublisher getPublisher();
-
- IMessageListener getConsumer();
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomDataGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomDataGenerator.java
deleted file mode 100644
index 6c04c27..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomDataGenerator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.vocabulary.XSD;
-
-import java.util.Random;
-
-public class RandomDataGenerator {
-
- private Random random;
-
- public RandomDataGenerator()
- {
- random = new Random();
- }
-
- public Object getValue(EventPropertyPrimitive primitive)
- {
- if (primitive.getRuntimeType().equals(getString())) {
- return RandomStringUtils.randomAlphabetic(10);
- }
- else if (primitive.getRuntimeType().equals(getLong())) {
- return random.nextLong();
- }
- else if (primitive.getRuntimeType().equals(getInt())) {
- return random.nextInt();
- }
- else if (primitive.getRuntimeType().equals(getDouble())) {
- return random.nextDouble();
- }
- else {
- return random.nextBoolean();
- }
- }
-
- private String getString()
- {
- return XSD._string.toString();
- }
-
- private String getLong()
- {
- return XSD._long.toString();
- }
-
- private String getInt()
- {
- return XSD._integer.toString();
- }
-
- private String getDouble()
- {
- return XSD._double.toString();
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomEventGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomEventGenerator.java
deleted file mode 100644
index f43678e..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/RandomEventGenerator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-
-public class RandomEventGenerator extends EventGenerator {
-
- private RandomDataGenerator dataGenerator;
-
- public RandomEventGenerator(EventSchema schema,
- FormatGenerator formatGenerator) {
- super(schema, formatGenerator);
- this.dataGenerator = new RandomDataGenerator();
- }
-
- @Override
- protected Map<String, Object> makeNestedProperty(EventPropertyNested nested) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- protected Object makePrimitiveProperty(EventPropertyPrimitive primitive) {
- return dataGenerator.getValue(primitive);
- }
-
- @Override
- protected List<?> makeListProperty(EventPropertyList list) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SchemaGenerator.java
deleted file mode 100644
index bd70363..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SchemaGenerator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.manager.monitoring.job.MonitoringUtils;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-
-public class SchemaGenerator {
-
- public EventSchema generateSchema(EventSchema schemaRequirement, boolean minimumSchema)
- {
- EventSchema schema = new EventSchema();
-
- for(EventProperty requiredProperty : schemaRequirement.getEventProperties())
- {
- if (requiredProperty instanceof EventPropertyPrimitive)
- schema.addEventProperty(new EventPropertyPrimitive(((EventPropertyPrimitive) requiredProperty).getRuntimeType(), MonitoringUtils.randomKey(), "", requiredProperty.getDomainProperties()));
- //else if (requiredProperty instanceof EventPropertyNested)
- }
- return schema;
- }
-
- private EventProperty addSampleProperty()
- {
- //TODO
- return null;
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoring.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoring.java
deleted file mode 100644
index 2e3301a..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoring.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
-import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
-import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException;
-import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.messaging.InternalEventProcessor;
-import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.storage.couchdb.impl.PipelineStorageImpl;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public class SepStoppedMonitoring implements EpRuntimeMonitoring<DataSourceDescription>, Runnable {
-
- private Map<String, List<PipelineObserver>> streamToObserver;
- private Map<String, Pipeline> streamToStoppedMonitoringPipeline;
- private SpKafkaConsumer kafkaConsumerGroup;
-
- @Override
- public boolean register(PipelineObserver observer) {
-
- try {
- Pipeline pipeline = new PipelineStorageImpl().getPipeline(observer.getPipelineId());
-
- List<SpDataStream> allStreams = new ArrayList<>();
- pipeline.getStreams().forEach((s) -> allStreams.add(s));
-
- for (SpDataStream s : allStreams) {
- if (streamToObserver.get(s.getElementId()) == null) {
- List<PipelineObserver> po = new ArrayList<>();
- po.add(observer);
-
- // TODO fix this s.getSourceId() is always null
- String streamId = s.getElementId();
- String sourceId = streamId.substring(0, streamId.lastIndexOf("/"));
-
- streamToObserver.put(streamId, po);
- Pipeline p = new SepStoppedMonitoringPipelineBuilder(sourceId, streamId).buildPipeline();
- Operations.startPipeline(p, false, false, false);
- streamToStoppedMonitoringPipeline.put(streamId, p);
-
- } else {
- streamToObserver.get(s.getElementId()).add(observer);
- }
- }
-
- } catch (URISyntaxException e) {
- e.printStackTrace();
- } catch (NoMatchingFormatException e) {
- e.printStackTrace();
- } catch (NoMatchingSchemaException e) {
- e.printStackTrace();
- } catch (NoMatchingProtocolException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return false;
- }
-
- @Override
- public boolean remove(PipelineObserver observer) {
-
- Pipeline pipeline = new PipelineStorageImpl().getPipeline(observer.getPipelineId());
- List<SpDataStream> streams = pipeline.getStreams();
-
- for (SpDataStream sc : streams) {
- String streamId = sc.getElementId();
- List<PipelineObserver> po = streamToObserver.get(streamId);
- if (po.size() == 1) {
- streamToObserver.remove(streamId);
-
- Operations.stopPipeline(streamToStoppedMonitoringPipeline.get(streamId), false, false, false);
- streamToStoppedMonitoringPipeline.remove(streamId);
- } else {
- po.remove(observer);
- }
-
- }
-
- return false;
- }
-
- private class KafkaCallback implements InternalEventProcessor<byte[]> {
-
- @Override
- public void onEvent(byte[] payload) {
- String str = new String(payload, StandardCharsets.UTF_8);
- JsonObject jo = new JsonParser().parse(str).getAsJsonObject();
-
- List<PipelineObserver> listPos = streamToObserver.get(jo.get("topic").getAsString());
- for (PipelineObserver po : listPos) {
- po.update();
- }
-
-
- }
-
- }
-
- @Override
- public void run() {
- streamToObserver = new HashMap<>();
- streamToStoppedMonitoringPipeline = new HashMap<>();
-
- String topic = "internal.streamepipes.sec.stopped";
-
- kafkaConsumerGroup = new SpKafkaConsumer(BackendConfig.INSTANCE.getKafkaUrl(), topic,
- new KafkaCallback());
- Thread thread = new Thread(kafkaConsumerGroup);
- thread.start();
-
- }
-
- public static void main(String[] args) throws IOException {
- SepStoppedMonitoring monitoring = new SepStoppedMonitoring();
- monitoring.run();
-
- // Montrac 01
- String id = "baaaf5b2-5412-4ac1-a7eb-04aeaf0e12b8";
- PipelineObserver observer1 = new PipelineObserver(id);
-
- // Montrac 02
- id = "ef915142-2a08-4166-8bea-8d946ae31cd6";
- PipelineObserver observer2 = new PipelineObserver(id);
-
- // Random Number Stream
- id = "b3c0b6ad-05df-4670-a078-83775eeb550b";
- PipelineObserver observer3 = new PipelineObserver(id);
-
- monitoring.register(observer1);
- monitoring.register(observer2);
- monitoring.register(observer3);
-
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- String s = br.readLine();
- monitoring.remove(observer1);
- monitoring.remove(observer2);
- monitoring.remove(observer3);
- System.out.println("laalal");
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoringPipelineBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoringPipelineBuilder.java
deleted file mode 100644
index 6612104..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoringPipelineBuilder.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
-import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
-import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException;
-import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.model.staticproperty.DomainStaticProperty;
-import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
-import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.model.staticproperty.SupportedProperty;
-import org.apache.streampipes.storage.management.StorageManager;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-public class SepStoppedMonitoringPipelineBuilder {
-
- // TODO make ULSs dynamic
-// private final String RATE_SEPA_URI = "http://frosch.fzi.de:8090/sepa/streamStopped";
-// private final String KAFKA_SEC_URI = "http://frosch.fzi.de:8091/kafka";
-
- private final String RATE_SEPA_URI = "http://ipe-koi05.perimeter.fzi.de:8090/sepa/streamStopped";
- private final String KAFKA_SEC_URI = "http://ipe-koi04.perimeter.fzi.de:8091/kafka";
- private final String OUTPUT_TOPIC = "internal.streamepipes.sec.stopped";
-
- private SpDataStream stream;
- private final String outputTopic;
-
- private DataSourceDescription dataSourceDescription;
-
- private DataProcessorDescription streamStoppedDataProcessorDescription;
- private DataSinkDescription kafkaDataSinkDescription;
- private String streamUri;
-
- public SepStoppedMonitoringPipelineBuilder(String sepUri, String streamUri) throws URISyntaxException {
- this.outputTopic = OUTPUT_TOPIC;
- this.streamUri = streamUri;
- DataSourceDescription desc = StorageManager.INSTANCE.getPipelineElementStorage().getDataSourceById(sepUri);
- this.stream = StorageManager.INSTANCE.getPipelineElementStorage().getEventStreamById(streamUri);
- this.dataSourceDescription = desc;
- this.streamStoppedDataProcessorDescription = getStreamStoppedEpa();
- this.kafkaDataSinkDescription = getKafkaPublisherEc();
- }
-
- public Pipeline buildPipeline()
- throws NoMatchingFormatException, NoMatchingSchemaException, NoMatchingProtocolException, Exception {
- DataProcessorInvocation rateSepaClient = new DataProcessorInvocation(streamStoppedDataProcessorDescription);
- SpDataStream streamClient = stream instanceof SpDataStream ? new SpDataStream(stream) : new SpDataSet(
- (SpDataSet) stream);
- DataSinkInvocation kafkaActionClient = new DataSinkInvocation(kafkaDataSinkDescription);
-
- List<NamedStreamPipesEntity> elements = new ArrayList<>();
- elements.add(streamClient);
-
- rateSepaClient.setConnectedTo(Arrays.asList("stream"));
- streamClient.setDOM("stream");
- rateSepaClient.setDOM("rate");
- kafkaActionClient.setDOM("kafka");
-
- Pipeline pipeline = new Pipeline();
- pipeline.setStreams(Arrays.asList(streamClient));
-
- pipeline.setSepas(Arrays.asList(rateSepaClient));
-
- PipelineModificationMessage message = new PipelineVerificationHandler(pipeline).validateConnection()
- .computeMappingProperties().getPipelineModificationMessage();
-
- DataProcessorInvocation updatedSepa = updateStreamStoppedSepa(rateSepaClient, message);
- pipeline.setSepas(Arrays.asList(updatedSepa));
-
- kafkaActionClient.setConnectedTo(Arrays.asList("rate"));
- pipeline.setActions(Arrays.asList(kafkaActionClient));
-
- message = new PipelineVerificationHandler(pipeline).validateConnection().computeMappingProperties()
- .getPipelineModificationMessage();
-
- pipeline.setActions(Arrays.asList(updateKafkaSec(kafkaActionClient, message)));
-
- pipeline.setPipelineId(UUID.randomUUID().toString());
- pipeline.setName("Monitoring - " + stream.getName());
-
- return pipeline;
- }
-
- private DataSinkDescription getKafkaPublisherEc() throws URISyntaxException {
- return StorageManager.INSTANCE.getPipelineElementStorage().getDataSinkById(KAFKA_SEC_URI);
- }
-
- private DataProcessorDescription getStreamStoppedEpa() throws URISyntaxException {
- return StorageManager.INSTANCE.getPipelineElementStorage().getDataProcessorById(RATE_SEPA_URI);
- }
-
- private DataSinkInvocation updateKafkaSec(DataSinkInvocation actionClient, PipelineModificationMessage message) {
- List<StaticProperty> properties = message.getPipelineModifications().get(0).getStaticProperties();
- List<StaticProperty> newStaticProperties = new ArrayList<>();
- for (StaticProperty p : properties) {
- if (p instanceof FreeTextStaticProperty ||p instanceof DomainStaticProperty) {
- if (p instanceof FreeTextStaticProperty) {
- if (p.getInternalName().equals("topic"))
- ((FreeTextStaticProperty) p).setValue(outputTopic);
- }
- else if (p instanceof DomainStaticProperty) {
- for(SupportedProperty sp : ((DomainStaticProperty) p).getSupportedProperties()) {
- if (sp.getPropertyId().equals("http://schema.org/kafkaHost"))
- sp.setValue(String
- .valueOf(BackendConfig.INSTANCE.getKafkaHost()));
- else if (sp.getPropertyId().equals("http://schema.org/kafkaPort"))
- sp.setValue(String
- .valueOf(BackendConfig.INSTANCE.getKafkaPort()));
- }
- }
-
- }
- newStaticProperties.add(p);
- }
- actionClient.setStaticProperties(newStaticProperties);
- return actionClient;
- }
-
- private DataProcessorInvocation updateStreamStoppedSepa(DataProcessorInvocation newSEPA, PipelineModificationMessage message) {
- List<StaticProperty> properties = message.getPipelineModifications().get(0).getStaticProperties();
- List<StaticProperty> newStaticProperties = new ArrayList<>();
- for (StaticProperty p : properties) {
- if (p instanceof FreeTextStaticProperty) {
-
- if (p.getInternalName().equals("topic"))
- ((FreeTextStaticProperty) p).setValue(String.valueOf(streamUri));
-
- }
- newStaticProperties.add(p);
- }
- newSEPA.setStaticProperties(newStaticProperties);
- return newSEPA;
- }
-
- public static void main(String[] args) throws URISyntaxException {
-
- String SEP_URI = "http://frosch.fzi.de:8089//source-wunderbar";
- String STREAM_URI = "http://frosch.fzi.de:8089//source-wunderbar/accelerometer";
-
- SepStoppedMonitoringPipelineBuilder pc = new SepStoppedMonitoringPipelineBuilder(SEP_URI, STREAM_URI);
-
- try {
- Pipeline pipeline = pc.buildPipeline();
- Operations.startPipeline(pipeline, false, false, false);
-
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- String s = br.readLine();
-
- Operations.stopPipeline(pipeline, false, false, false);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SimilarStreamFinder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SimilarStreamFinder.java
deleted file mode 100644
index 07b99e7..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/SimilarStreamFinder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.model.quality.MeasurementCapability;
-import org.apache.streampipes.model.quality.MeasurementObject;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.storage.management.StorageManager;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SimilarStreamFinder {
-
- private Pipeline pipeline;
-
- private List<SpDataStream> similarStreams;
-
- public SimilarStreamFinder(String pipelineId) {
- this.pipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(pipelineId);
- this.similarStreams = new ArrayList<>();
- }
-
- public boolean isReplacable() {
- if (pipeline.getStreams().size() > 1 || pipeline.getStreams().size() == 0) return false;
- else {
- return isSimilarStreamAvailable();
- }
- }
-
- private boolean isSimilarStreamAvailable() {
-
- List<DataSourceDescription> seps = StorageManager.INSTANCE.getPipelineElementStorage().getAllDataSources();
- List<SpDataStream> streams = getEventStreams(seps);
-
- SpDataStream pipelineInputStream = getStream();
- List<MeasurementCapability> pipelineInputStreamCapabilities = pipelineInputStream.getMeasurementCapability();
- List<MeasurementObject> pipelineInputStreamMeasurementObject = pipelineInputStream.getMeasurementObject();
-
- for(SpDataStream stream : streams) {
- if (!stream.getElementId().equals(pipelineInputStream.getElementId())) {
- if (matchesStream(pipelineInputStreamCapabilities, pipelineInputStreamMeasurementObject, stream.getMeasurementCapability(), stream.getMeasurementObject())) {
- similarStreams.add(stream);
- }
- }
- }
-
- return similarStreams.size() > 0;
- }
-
- private boolean matchesStream(
- List<MeasurementCapability> pipelineInputStreamCapabilities,
- List<MeasurementObject> pipelineInputStreamMeasurementObject,
- List<MeasurementCapability> measurementCapability,
- List<MeasurementObject> measurementObject) {
- return matchesCapability(pipelineInputStreamCapabilities, measurementCapability) &&
- matchesObject(pipelineInputStreamMeasurementObject, measurementObject);
- }
-
- private boolean matchesObject(
- List<MeasurementObject> pipelineInputStreamMeasurementObject,
- List<MeasurementObject> measurementObject) {
- if (pipelineInputStreamMeasurementObject == null | measurementObject == null) return false;
- else return pipelineInputStreamMeasurementObject.stream().allMatch(p -> measurementObject.stream().anyMatch(mc -> mc.getMeasuresObject().toString().equals(p.getMeasuresObject().toString())));
- }
-
- private boolean matchesCapability(
- List<MeasurementCapability> pipelineInputStreamCapabilities,
- List<MeasurementCapability> measurementCapability) {
- if (pipelineInputStreamCapabilities == null || measurementCapability == null) return false;
- else return pipelineInputStreamCapabilities.stream().allMatch(p -> measurementCapability.stream().anyMatch(mc -> mc.getCapability().toString().equals(p.getCapability().toString())));
- }
-
- private SpDataStream getStream() {
- String streamId = pipeline.getStreams().get(0).getElementId();
-
- return StorageManager.INSTANCE.getPipelineElementStorage().getEventStreamById(streamId);
- }
-
- private List<SpDataStream> getEventStreams(List<DataSourceDescription> seps) {
- List<SpDataStream> result = new ArrayList<>();
- for(DataSourceDescription sep : seps) {
- result.addAll(sep.getSpDataStreams());
- }
- return result;
- }
-
- public List<SpDataStream> getSimilarStreams() {
- return similarStreams;
- }
-
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StaticPropertyGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StaticPropertyGenerator.java
deleted file mode 100644
index a866157..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StaticPropertyGenerator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-public class StaticPropertyGenerator {
-
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StreamGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StreamGenerator.java
deleted file mode 100644
index 5a2f088..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/StreamGenerator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-public class StreamGenerator {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ThriftFormatGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ThriftFormatGenerator.java
deleted file mode 100644
index eea8ea4..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/runtime/ThriftFormatGenerator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.runtime;
-
-public class ThriftFormatGenerator {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/CompareDescriptionTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/CompareDescriptionTask.java
deleted file mode 100644
index 3c544c5..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/CompareDescriptionTask.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-public class CompareDescriptionTask {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/GetDescriptionTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/GetDescriptionTask.java
deleted file mode 100644
index fad22a5..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/GetDescriptionTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-import java.io.IOException;
-
-import org.apache.http.client.ClientProtocolException;
-
-import org.apache.streampipes.manager.monitoring.job.MonitoringUtils;
-import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
-import org.apache.streampipes.model.client.monitoring.TaskReport;
-
-public class GetDescriptionTask extends TaskDefinition {
-
- private ConsumableStreamPipesEntity element;
-
- private static final String TASK_NAME = "HTTP Get Availability consul";
-
- public GetDescriptionTask(ConsumableStreamPipesEntity element)
- {
- super();
- this.element = element;
- }
-
-
- @Override
- public void executeBefore() {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void executeAfter() {
- // TODO Auto-generated method stub
- }
-
- @Override
- public TaskReport defineTaskExecution() {
- try {
- int statusCode = MonitoringUtils.getHttpResponse(element.getUri()).getStatusLine().getStatusCode();
- if (statusCode == 200) {
- return successMsg(TASK_NAME);
- }
- else
- {
- return errorMsg(TASK_NAME, "Wrong status code");
- }
- } catch (ClientProtocolException e) {
- e.printStackTrace();
- return errorMsg(TASK_NAME, e.getMessage());
- } catch (IOException e) {
- e.printStackTrace();
- return errorMsg(TASK_NAME, e.getMessage());
- }
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/InvokeRuntimeTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/InvokeRuntimeTask.java
deleted file mode 100644
index 3166d93..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/InvokeRuntimeTask.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-public class InvokeRuntimeTask {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/RemoveRuntimeTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/RemoveRuntimeTask.java
deleted file mode 100644
index 035a882..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/RemoveRuntimeTask.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-public class RemoveRuntimeTask {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/TaskDefinition.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/TaskDefinition.java
deleted file mode 100644
index 4194500..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/TaskDefinition.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-import java.util.Date;
-
-import org.apache.streampipes.model.client.monitoring.TaskReport;
-
-public abstract class TaskDefinition {
-
- protected TaskReport results;
-
- public abstract void executeBefore();
-
- public abstract void executeAfter();
-
- public abstract TaskReport defineTaskExecution();
-
- public TaskReport execute()
- {
- executeBefore();
- TaskReport report = defineTaskExecution();
- executeAfter();
- return report;
- }
-
- protected TaskReport successMsg(String taskName)
- {
- return new TaskReport(taskName, new Date(), true, "");
- }
-
- protected TaskReport errorMsg(String taskName, String msg)
- {
- return new TaskReport(taskName, new Date(), false, msg);
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/VerifySchemaTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/VerifySchemaTask.java
deleted file mode 100644
index 6767ebd..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/task/VerifySchemaTask.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.monitoring.task;
-
-public class VerifySchemaTask {
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepaVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepaVerifier.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
index ae8c147..95382ef 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepaVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
@@ -24,9 +24,9 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
import java.io.IOException;
-public class SepaVerifier extends ElementVerifier<DataProcessorDescription> {
+public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescription> {
- public SepaVerifier(String graphData)
+ public DataProcessorVerifier(String graphData)
throws SepaParseException {
super(graphData, DataProcessorDescription.class);
// TODO Auto-generated constructor stub
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SecVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SecVerifier.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
index 3270620..2f3e9b6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SecVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
@@ -24,10 +24,10 @@ import org.apache.streampipes.model.graph.DataSinkDescription;
import java.io.IOException;
-public class SecVerifier extends ElementVerifier<DataSinkDescription> {
+public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
- public SecVerifier(String graphData)
+ public DataSinkVerifier(String graphData)
throws SepaParseException {
super(graphData, DataSinkDescription.class);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
similarity index 76%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepVerifier.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
index 14da852..a87b1b2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/SepVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
@@ -20,14 +20,17 @@ package org.apache.streampipes.manager.verification;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import java.io.IOException;
-public class SepVerifier extends ElementVerifier<DataSourceDescription> {
+public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
- public SepVerifier(String graphData) {
- super(graphData, DataSourceDescription.class);
+ public DataStreamVerifier(String graphData) {
+ super(graphData, SpDataStream.class);
+ }
+
+ public DataStreamVerifier(SpDataStream stream) {
+ super(stream);
}
@Override
@@ -45,7 +48,7 @@ public class SepVerifier extends ElementVerifier<DataSourceDescription> {
}
*/
if (!storageApi.exists(elementDescription)) {
- storageApi.storeDataSource(elementDescription);
+ storageApi.storeDataStream(elementDescription);
if (refreshCache) {
storageApi.refreshDataSourceCache();
}
@@ -68,20 +71,16 @@ public class SepVerifier extends ElementVerifier<DataSourceDescription> {
@Override
protected void storeAssets() throws IOException {
- for (SpDataStream stream : elementDescription.getSpDataStreams()) {
- if (stream.isIncludesAssets()) {
- AssetManager.storeAsset(stream.getElementId(), stream.getAppId());
- }
+ if (elementDescription.isIncludesAssets()) {
+ AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
}
}
@Override
protected void updateAssets() throws IOException {
- for (SpDataStream stream : elementDescription.getSpDataStreams()) {
- if (stream.isIncludesAssets()) {
- AssetManager.deleteAsset(elementDescription.getAppId());
- storeAssets();
- }
+ if (elementDescription.isIncludesAssets()) {
+ AssetManager.deleteAsset(elementDescription.getAppId());
+ storeAssets();
}
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
index 292f870..3603192 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
@@ -46,6 +46,8 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
private String graphData;
private Class<T> elementClass;
+ private boolean shouldTransform;
+
protected T elementDescription;
protected List<VerificationResult> validationResults;
@@ -58,6 +60,14 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
public ElementVerifier(String graphData, Class<T> elementClass) {
this.elementClass = elementClass;
this.graphData = graphData;
+ this.shouldTransform = true;
+ this.validators = new ArrayList<>();
+ this.validationResults = new ArrayList<>();
+ }
+
+ public ElementVerifier(T elementDescription) {
+ this.elementDescription = elementDescription;
+ this.shouldTransform = false;
this.validators = new ArrayList<>();
this.validationResults = new ArrayList<>();
}
@@ -77,11 +87,13 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
}
public Message verifyAndAdd(String username, boolean publicElement, boolean refreshCache) throws SepaParseException {
- try {
- this.elementDescription = transform();
- } catch (RDFParseException | UnsupportedRDFormatException
- | RepositoryException | IOException e) {
- return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification());
+ if (shouldTransform) {
+ try {
+ this.elementDescription = transform();
+ } catch (RDFParseException | UnsupportedRDFormatException
+ | RepositoryException | IOException e) {
+ return new ErrorMessage(NotificationType.UNKNOWN_ERROR.uiNotification());
+ }
}
verify();
if (isVerifiedSuccessfully()) {
@@ -158,7 +170,7 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
}
private boolean isVerifiedSuccessfully() {
- return !validationResults.stream().anyMatch(validator -> (validator instanceof VerificationError));
+ return validationResults.stream().noneMatch(validator -> (validator instanceof VerificationError));
}
protected T transform() throws JsonProcessingException {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
index 7ec4697..b75d6e1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
@@ -21,13 +21,13 @@ package org.apache.streampipes.manager.verification.extractor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streampipes.commons.exceptions.SepaParseException;
+import org.apache.streampipes.manager.verification.DataProcessorVerifier;
+import org.apache.streampipes.manager.verification.DataSinkVerifier;
+import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.manager.verification.ElementVerifier;
-import org.apache.streampipes.manager.verification.SecVerifier;
-import org.apache.streampipes.manager.verification.SepVerifier;
-import org.apache.streampipes.manager.verification.SepaVerifier;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import java.util.logging.Logger;
@@ -57,16 +57,16 @@ public class TypeExtractor {
if (jsonClassName == null) {
throw new SepaParseException();
} else {
- if (jsonClassName.equals(ep())) { logger.info("Detected type sep"); return new SepVerifier(pipelineElementDescription); }
- else if (jsonClassName.equals(epa())) { logger.info("Detected type sepa"); return new SepaVerifier(pipelineElementDescription); }
- else if (jsonClassName.equals(ec())) { logger.info("Detected type sec"); return new SecVerifier(pipelineElementDescription); }
+ if (jsonClassName.equals(ep())) { logger.info("Detected type data stream"); return new DataStreamVerifier(pipelineElementDescription); }
+ else if (jsonClassName.equals(epa())) { logger.info("Detected type data processor"); return new DataProcessorVerifier(pipelineElementDescription); }
+ else if (jsonClassName.equals(ec())) { logger.info("Detected type data sink"); return new DataSinkVerifier(pipelineElementDescription); }
else throw new SepaParseException();
}
}
private static final String ep()
{
- return DataSourceDescription.class.getCanonicalName();
+ return SpDataStream.class.getCanonicalName();
}
private static final String epa()
diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
index 56dbc24..1fdbfea 100644
--- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
+++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
@@ -57,7 +57,7 @@ public class TestUtils {
public static Pipeline makePipeline(SemanticEventProducerDeclarer producer, EventStreamDeclarer stream, SemanticEventProcessingAgentDeclarer agent) {
DataSourceDescription dataSourceDescription = new DataSourceDescription(producer.declareModel());
dataSourceDescription.setElementId("http://www.schema.org/test1");
- SpDataStream offer = stream.declareModel(dataSourceDescription);
+ SpDataStream offer = stream.declareModel();
offer.setElementId("http://www.schema.org/test2");
DataProcessorDescription requirement = (agent.declareModel());
requirement.setElementId("http://www.schema.org/test3");
@@ -93,7 +93,7 @@ public class TestUtils {
}
public static SpDataStream makeStream(SemanticEventProducerDeclarer declarer, EventStreamDeclarer streamDec, String domId) {
- SpDataStream stream = new SpDataStream(streamDec.declareModel(declarer.declareModel()));
+ SpDataStream stream = new SpDataStream(streamDec.declareModel());
stream.setDOM(domId);
return stream;
}
diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/TestSerializer.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/TestSerializer.java
deleted file mode 100644
index e9e9b5a..0000000
--- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/pipeline/TestSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.pipeline;
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.serializers.json.GsonSerializer;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class TestSerializer {
-
- public static void main(String[] args) {
- List<DataSourceDescription> sep = StorageDispatcher.INSTANCE.getTripleStore().getPipelineElementStorage().getAllDataSources().stream
- ().map(m -> new DataSourceDescription(m)).collect(Collectors.toList());
-
- String json = GsonSerializer.getGson().toJson(sep.get(0));
- System.out.println(json);
-
- List<DataProcessorDescription> processors = StorageDispatcher.INSTANCE.getTripleStore().getPipelineElementStorage()
- .getAllDataProcessors().stream().map(m -> new DataProcessorDescription(m)).collect(Collectors.toList());
- String json2 = GsonSerializer.getGson().toJson(processors.get(0));
- System.out.println(json2);
-
- DataSourceDescription description = GsonSerializer.getGson().fromJson(json, DataSourceDescription.class);
- System.out.println(description.getName());
-
- DataProcessorDescription processor2 = GsonSerializer.getGson().fromJson(json2, DataProcessorDescription.class);
- System.out.println(processor2.getName());
- }
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ApplicationLink.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ApplicationLink.java
deleted file mode 100644
index 82d80c7..0000000
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ApplicationLink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.rest.impl;
-
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.rest.api.IApplicationLink;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-@Path("/v2/applink")
-public class ApplicationLink extends AbstractRestInterface implements IApplicationLink {
-
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Override
- public Response getApplicationLinks() {
- return ok(generateAppLinks());
- }
-
- private List<org.apache.streampipes.model.ApplicationLink> generateAppLinks() {
- List<NamedStreamPipesEntity> allElements = new ArrayList<>();
- List<org.apache.streampipes.model.ApplicationLink> allApplicationLinks = new ArrayList<>();
-
- allElements.addAll(getPipelineElementRdfStorage()
- .getAllDataProcessors().stream().map(e -> new DataProcessorDescription(e)).collect(Collectors.toList()));
- allElements.addAll(getPipelineElementRdfStorage()
- .getAllDataSinks().stream().map(e -> new DataSinkDescription(e)).collect(Collectors.toList()));
- allElements.addAll(getPipelineElementRdfStorage()
- .getAllDataSources().stream().map(e -> new DataSourceDescription(e)).collect(Collectors.toList()));
-
- allElements.stream().forEach(e -> allApplicationLinks.addAll(removeDuplicates(allApplicationLinks, e.getApplicationLinks())));
-
- return allApplicationLinks;
- }
-
- private List<org.apache.streampipes.model.ApplicationLink> removeDuplicates(List<org.apache.streampipes.model.ApplicationLink> allApplicationLinks,
- List<org.apache.streampipes.model.ApplicationLink> applicationLinks) {
- List<org.apache.streampipes.model.ApplicationLink> result = new ArrayList<>();
-
- applicationLinks.forEach( a -> {
- if (allApplicationLinks
- .stream()
- .noneMatch(existing -> existing.getApplicationUrl()
- .equals(existing.getApplicationUrl()))) {
- result.add(a);
- }
- });
-
- return result;
-
- }
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/InternalPipelineTemplates.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/InternalPipelineTemplates.java
index 12435d5..ed9478a 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/InternalPipelineTemplates.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/InternalPipelineTemplates.java
@@ -18,14 +18,11 @@
package org.apache.streampipes.rest.impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.model.template.PipelineTemplateInvocation;
import org.apache.streampipes.rest.api.InternalPipelineTemplate;
@@ -33,22 +30,18 @@ import org.apache.streampipes.sdk.builder.BoundPipelineElementBuilder;
import org.apache.streampipes.sdk.builder.PipelineTemplateBuilder;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
@Path("/v2/users/{username}/internal-pipelines")
public class InternalPipelineTemplates extends AbstractRestInterface implements InternalPipelineTemplate {
@@ -115,14 +108,7 @@ public class InternalPipelineTemplates extends AbstractRestInterface implements
}
private List<SpDataStream> getAllDataStreams() {
- List<DataSourceDescription> sources = getPipelineElementRdfStorage().getAllDataSources();
- List<SpDataStream> datasets = new ArrayList<>();
- for (DataSourceDescription source : sources) {
- datasets.addAll(source
- .getSpDataStreams());
- }
-
- return datasets;
+ return getPipelineElementRdfStorage().getAllDataStreams();
}
private SpDataStream getLogDataStream() {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/OntologyPipelineElement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/OntologyPipelineElement.java
index 20bd7b9..7d72dd9 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/OntologyPipelineElement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/OntologyPipelineElement.java
@@ -21,9 +21,9 @@ package org.apache.streampipes.rest.impl;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.rest.api.IOntologyPipelineElement;
import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
import org.apache.streampipes.storage.management.StorageManager;
@@ -43,11 +43,11 @@ public class OntologyPipelineElement extends AbstractRestInterface implements IO
@GsonWithIds
@Produces(MediaType.APPLICATION_JSON)
public Response getStreams() {
- List<DataSourceDescription> result = new ArrayList<>();
- List<DataSourceDescription> sesameSeps = StorageManager.INSTANCE.getPipelineElementStorage().getAllDataSources();
+ List<SpDataStream> result = new ArrayList<>();
+ List<SpDataStream> sesameSeps = StorageManager.INSTANCE.getPipelineElementStorage().getAllDataStreams();
- for (DataSourceDescription sep : sesameSeps) {
- result.add(new DataSourceDescription(sep));
+ for (SpDataStream sep : sesameSeps) {
+ result.add(new SpDataStream(sep));
}
return ok(result);
}
@@ -58,7 +58,7 @@ public class OntologyPipelineElement extends AbstractRestInterface implements IO
@Produces(MediaType.APPLICATION_JSON)
public Response getSourceDetails(@PathParam("sourceId") String sepaId, @QueryParam("keepIds") boolean keepIds) {
- DataSourceDescription sepaDescription = new DataSourceDescription(StorageManager.INSTANCE.getPipelineElementStorage().getDataSourceById(sepaId));
+ SpDataStream sepaDescription = new SpDataStream(StorageManager.INSTANCE.getPipelineElementStorage().getDataStreamById(sepaId));
return ok(sepaDescription);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementCategory.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementCategory.java
index 06aee1d..f86754d 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementCategory.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementCategory.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.rest.impl;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.client.Category;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.rest.api.IPipelineElementCategory;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.storage.management.StorageManager;
@@ -44,7 +44,7 @@ public class PipelineElementCategory extends AbstractRestInterface implements IP
@JacksonSerialized
@Override
public Response getEps() {
- return ok(makeCategories(StorageManager.INSTANCE.getPipelineElementStorage().getAllDataSources()));
+ return ok(makeCategories(StorageManager.INSTANCE.getPipelineElementStorage().getAllDataStreams()));
}
@GET
@@ -74,8 +74,8 @@ public class PipelineElementCategory extends AbstractRestInterface implements IP
return ok(DataSinkType.values());
}
- private List<Category> makeCategories(List<DataSourceDescription> producers) {
- return producers
+ private List<Category> makeCategories(List<SpDataStream> streams) {
+ return streams
.stream()
.map(p -> new Category(p.getElementId(), p.getName(), p.getDescription()))
.collect(Collectors.toList());
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementImport.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementImport.java
index 807c030..c8a580e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementImport.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementImport.java
@@ -116,9 +116,9 @@ public class PipelineElementImport extends AbstractRestInterface {
requestor.deleteDataProcessor(requestor.getDataProcessorById(elementId));
userService.deleteOwnSepa(username, elementId);
requestor.refreshDataProcessorCache();
- } else if (requestor.existsDataSource(elementId)) {
- appId = requestor.getDataSourceById(elementId).getAppId();
- requestor.deleteDataSource(requestor.getDataSourceById(elementId));
+ } else if (requestor.existsDataStream(elementId)) {
+ appId = requestor.getDataStreamById(elementId).getAppId();
+ requestor.deleteDataStream(requestor.getDataStreamById(elementId));
userService.deleteOwnSource(username, elementId);
requestor.refreshDataSourceCache();
} else if (requestor.existsDataSink(elementId)) {
@@ -146,8 +146,8 @@ public class PipelineElementImport extends AbstractRestInterface {
elementId = decode(elementId);
if (requestor.getDataProcessorById(elementId) != null) {
return ok(toJsonLd(requestor.getDataProcessorById(elementId)));
- } else if (requestor.getDataSourceById(elementId) != null) {
- return ok(toJsonLd(requestor.getDataSourceById(elementId)));
+ } else if (requestor.getDataStreamById(elementId) != null) {
+ return ok(toJsonLd(requestor.getDataStreamById(elementId)));
} else if (requestor.getDataSinkById(elementId) != null) {
return ok(toJsonLd(requestor.getDataSinkById(elementId)));
} else {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
index 143ed53..07126d4 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.SpDataStreamContainer;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.model.template.PipelineTemplateDescriptionContainer;
import org.apache.streampipes.model.template.PipelineTemplateInvocation;
@@ -31,19 +30,13 @@ import org.apache.streampipes.rest.shared.util.SpMediaType;
import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
import org.apache.streampipes.vocabulary.StreamPipes;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
@Path("/v2/users/{username}/pipeline-templates")
public class PipelineTemplate extends AbstractRestInterface implements IPipelineTemplate {
@@ -52,17 +45,13 @@ public class PipelineTemplate extends AbstractRestInterface implements IPipeline
@Produces(MediaType.APPLICATION_JSON)
@Override
public Response getAvailableDataStreams() {
- List<DataSourceDescription> sources = getPipelineElementRdfStorage().getAllDataSources();
+ List<SpDataStream> sources = getPipelineElementRdfStorage().getAllDataStreams();
List<SpDataStream> datasets = new ArrayList<>();
- for (DataSourceDescription source : sources) {
- source
- .getSpDataStreams()
- .stream()
+ sources.stream()
.filter(stream -> !(stream instanceof SpDataSet))
.map(SpDataStream::new)
.forEach(datasets::add);
- }
return ok(toJsonLd(new SpDataStreamContainer(datasets)));
}
@@ -73,17 +62,14 @@ public class PipelineTemplate extends AbstractRestInterface implements IPipeline
@Override
public Response getAvailableDataSets() {
- List<DataSourceDescription> sources = getPipelineElementRdfStorage().getAllDataSources();
+ List<SpDataStream> sources = getPipelineElementRdfStorage().getAllDataStreams();
List<SpDataStream> datasets = new ArrayList<>();
- for (DataSourceDescription source : sources) {
- source
- .getSpDataStreams()
+ sources
.stream()
.filter(stream -> stream instanceof SpDataSet)
.map(stream -> new SpDataSet((SpDataSet) stream))
.forEach(set -> datasets.add((SpDataSet) set));
- }
return ok(toJsonLd(new SpDataStreamContainer(datasets)));
}
@@ -127,15 +113,7 @@ public class PipelineTemplate extends AbstractRestInterface implements IPipeline
}
private List<SpDataStream> getAllDataStreams() {
- List<DataSourceDescription> sources = getPipelineElementRdfStorage().getAllDataSources();
- List<SpDataStream> datasets = new ArrayList<>();
-
- for (DataSourceDescription source : sources) {
- datasets.addAll(source
- .getSpDataStreams());
- }
-
- return datasets;
+ return getPipelineElementRdfStorage().getAllDataStreams();
}
private SpDataStream getDataStream(String streamId) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
index 4371298..0bc2ca6 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.rest.impl;
import org.apache.streampipes.manager.endpoint.EndpointFetcher;
import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
import org.apache.streampipes.rest.api.IRdfEndpoint;
@@ -82,7 +83,7 @@ public class RdfEndpoint extends AbstractRestInterface implements IRdfEndpoint {
items.forEach(item -> item.setInstalled(isInstalled(item.getUri(), username)));
// also add installed elements that are currently not running or available
- items.addAll(getAllDataSourceEndpoints(username, items));
+ items.addAll(getAllDataStreamEndpoints(username, items));
items.addAll(getAllDataProcessorEndpoints(username, items));
items.addAll(getAllDataSinkEndpoints(username, items));
@@ -102,22 +103,18 @@ public class RdfEndpoint extends AbstractRestInterface implements IRdfEndpoint {
private List<String> getAllUserElements(String username) {
List<String> elementUris = new ArrayList<>();
- elementUris.addAll(getAllDataSourceUris(username));
+ elementUris.addAll(getAllDataStreamUris(username));
elementUris.addAll(getAllDataProcessorUris(username));
elementUris.addAll(getAllDataSinkUris(username));
return elementUris;
}
- private List<RdfEndpointItem> getAllDataSourceEndpoints(String username, List<RdfEndpointItem> existingItems) {
- return getAllDataSourceUris(username)
+ private List<RdfEndpointItem> getAllDataStreamEndpoints(String username, List<RdfEndpointItem> existingItems) {
+ return getAllDataStreamUris(username)
.stream()
.filter(s -> existingItems.stream().noneMatch(item -> item.getUri().equals(s)))
- .map(s -> getTripleStorage().getPipelineElementStorage().getDataSourceById(s))
- .map(source -> {
- RdfEndpointItem sourceItem = makeItem(source, "source");
- sourceItem.setStreams(source.getSpDataStreams().stream().map(stream -> makeItem(stream, "stream")).collect(Collectors.toList()));
- return sourceItem;
- })
+ .map(s -> getTripleStorage().getPipelineElementStorage().getDataStreamById(s))
+ .map(stream -> makeItem(stream, stream instanceof SpDataSet ? "set" : "stream"))
.collect(Collectors.toList());
}
@@ -147,10 +144,11 @@ public class RdfEndpoint extends AbstractRestInterface implements IRdfEndpoint {
endpoint.setAppId(entity.getAppId());
endpoint.setType(type);
endpoint.setUri(entity.getElementId());
+ endpoint.setEditable(!(entity.isInternallyManaged()));
return endpoint;
}
- private List<String> getAllDataSourceUris(String username) {
+ private List<String> getAllDataStreamUris(String username) {
return getUserService().getOwnSourceUris(username);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/SemanticEventProducer.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/SemanticEventProducer.java
index bd68c36..a7c56b3 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/SemanticEventProducer.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/SemanticEventProducer.java
@@ -19,10 +19,10 @@
package org.apache.streampipes.rest.impl;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.model.message.Notification;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.rest.api.IPipelineElement;
import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -35,24 +35,9 @@ import javax.ws.rs.core.Response;
import java.util.List;
import java.util.stream.Collectors;
-@Path("/v2/users/{username}/sources")
+@Path("/v2/users/{username}/streams")
public class SemanticEventProducer extends AbstractRestInterface implements IPipelineElement {
- @Path("/{sourceId}/streams")
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @GsonWithIds
- public Response getStreamsBySource(@PathParam("username") String username, @PathParam("sourceId") String sourceId)
- {
- try {
- return ok(new DataSourceDescription(getPipelineElementRdfStorage().getDataSourceById(sourceId)));
- } catch (Exception e) {
- return constructErrorMessage(new Notification(NotificationType.UNKNOWN_ERROR.title(),
- NotificationType.UNKNOWN_ERROR.description(), e.getMessage()));
- }
-
- }
-
@GET
@Path("/available")
@RequiresAuthentication
@@ -60,7 +45,7 @@ public class SemanticEventProducer extends AbstractRestInterface implements IPip
@GsonWithIds
@Override
public Response getAvailable(@PathParam("username") String username) {
- List<DataSourceDescription> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataSources(),
+ List<SpDataStream> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataStreams(),
getUserService().getAvailableSourceUris(username));
return ok(seps);
}
@@ -72,7 +57,7 @@ public class SemanticEventProducer extends AbstractRestInterface implements IPip
@GsonWithIds
@Override
public Response getFavorites(@PathParam("username") String username) {
- List<DataSourceDescription> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataSources(),
+ List<SpDataStream> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataStreams(),
getUserService().getFavoriteSourceUris(username));
return ok(seps);
}
@@ -84,9 +69,9 @@ public class SemanticEventProducer extends AbstractRestInterface implements IPip
@JacksonSerialized
@Override
public Response getOwn(@PathParam("username") String username) {
- List<DataSourceDescription> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataSources(),
+ List<SpDataStream> seps = Filter.byUri(getPipelineElementRdfStorage().getAllDataStreams(),
getUserService().getOwnSourceUris(username));
- List<DataSourceDescription> si = seps.stream().map(s -> new DataSourceDescription(s)).collect(Collectors.toList());
+ List<SpDataStream> si = seps.stream().map(s -> new Cloner().mapSequence(s)).collect(Collectors.toList());
return ok(si);
}
@@ -141,7 +126,7 @@ public class SemanticEventProducer extends AbstractRestInterface implements IPip
@Override
public Response getElement(@PathParam("username") String username, @PathParam("elementUri") String elementUri) {
// TODO Access rights
- return ok(new DataSourceDescription(getPipelineElementRdfStorage().getDataSourceById(elementUri)));
+ return ok(new Cloner().mapSequence(getPipelineElementRdfStorage().getDataStreamById(elementUri)));
}
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
index 54b23f8..614e83e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
@@ -65,8 +65,8 @@ public class PipelineElementImportNoUser extends AbstractRestInterface {
logger.info("User " + username + " deletes element with URI: " + uri + " from triplestore");
- if (requestor.getDataSourceById(uri) != null) {
- requestor.deleteDataSource(requestor.getDataSourceById(uri));
+ if (requestor.getDataStreamById(uri) != null) {
+ requestor.deleteDataStream(requestor.getDataStreamById(uri));
userService.deleteOwnSource(username, uri);
requestor.refreshDataSourceCache();
} else {
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IPipelineElementDescriptionStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IPipelineElementDescriptionStorage.java
index 32bd0c2..830c4a0 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IPipelineElementDescriptionStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IPipelineElementDescriptionStorage.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import java.net.URI;
@@ -32,19 +31,19 @@ public interface IPipelineElementDescriptionStorage {
boolean storeInvocablePipelineElement(InvocableStreamPipesEntity element);
- boolean storeDataSource(DataSourceDescription sep);
+ boolean storeDataStream(SpDataStream stream);
- boolean storeDataSource(String jsonld);
+ boolean storeDataStream(String jsonld);
- boolean storeDataProcessor(DataProcessorDescription sepa);
+ boolean storeDataProcessor(DataProcessorDescription processorDescription);
boolean storeDataProcessor(String jsonld);
- DataSourceDescription getDataSourceById(URI rdfId);
+ SpDataStream getDataStreamById(URI rdfId);
- DataSourceDescription getDataSourceByAppId(String appId);
+ SpDataStream getDataStreamByAppId(String appId);
- DataSourceDescription getDataSourceById(String rdfId);
+ SpDataStream getDataStreamById(String rdfId);
DataProcessorDescription getDataProcessorById(String rdfId);
@@ -58,31 +57,31 @@ public interface IPipelineElementDescriptionStorage {
DataSinkDescription getDataSinkByAppId(String appId);
- List<DataSourceDescription> getAllDataSources();
+ List<SpDataStream> getAllDataStreams();
List<DataProcessorDescription> getAllDataProcessors();
- boolean deleteDataSource(DataSourceDescription sep);
+ boolean deleteDataStream(SpDataStream sep);
- boolean deleteDataSource(String rdfId);
+ boolean deleteDataStream(String rdfId);
- boolean deleteDataProcessor(DataProcessorDescription sepa);
+ boolean deleteDataProcessor(DataProcessorDescription processorDescription);
boolean deleteDataProcessor(String rdfId);
- boolean exists(DataSourceDescription sep);
+ boolean exists(SpDataStream stream);
- boolean exists(DataProcessorDescription sepa);
+ boolean exists(DataProcessorDescription processorDescription);
boolean existsDataProcessor(String elementId);
- boolean existsDataSource(String elementId);
+ boolean existsDataStream(String elementId);
boolean existsDataSink(String elementId);
- boolean update(DataSourceDescription sep);
+ boolean update(SpDataStream stream);
- boolean update(DataProcessorDescription sepa);
+ boolean update(DataProcessorDescription processorDescription);
boolean exists(DataSinkDescription sec);
diff --git a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementInMemoryStorage.java b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementInMemoryStorage.java
index d8478cd..469ac32 100644
--- a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementInMemoryStorage.java
+++ b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementInMemoryStorage.java
@@ -24,46 +24,39 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
public class PipelineElementInMemoryStorage implements IPipelineElementDescriptionStorageCache {
private Map<String, DataSinkDescription> inMemoryDataSinkStorage;
- private Map<String, DataSourceDescription> inMemoryDataSourceStorage;
private Map<String, DataProcessorDescription> inMemoryDataProcessorStorage;
- private Map<String, SpDataStream> inMemoryEventStreamStorage;
+ private Map<String, SpDataStream> inMemoryDataStreamStorage;
private IPipelineElementDescriptionStorage sesameStorage;
public PipelineElementInMemoryStorage(IPipelineElementDescriptionStorage sesameStorage) {
this.inMemoryDataSinkStorage = new HashMap<>();
this.inMemoryDataProcessorStorage = new HashMap<>();
- this.inMemoryDataSourceStorage = new HashMap<>();
- this.inMemoryEventStreamStorage = new HashMap<>();
+ this.inMemoryDataStreamStorage = new HashMap<>();
this.sesameStorage = sesameStorage;
init();
}
private void init() {
- initializeSECStorage();
- initializeSEPAStorage();
- initializeSEPStorage();
+ initializeDataSinkStorage();
+ initializeDataProcessorStorage();
+ initializeDataStreamStorage();
}
- private void initializeSECStorage() {
+ private void initializeDataSinkStorage() {
inMemoryDataSinkStorage.clear();
List<DataSinkDescription> secs = sort(sesameStorage
.getAllDataSinks()
@@ -73,7 +66,7 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
secs.forEach(sec -> inMemoryDataSinkStorage.put(sec.getElementId(), sec));
}
- private void initializeSEPAStorage() {
+ private void initializeDataProcessorStorage() {
inMemoryDataProcessorStorage.clear();
List<DataProcessorDescription> sepas = sort(sesameStorage
.getAllDataProcessors()
@@ -83,17 +76,14 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
sepas.forEach(sepa -> inMemoryDataProcessorStorage.put(sepa.getElementId(), sepa));
}
- private void initializeSEPStorage() {
- inMemoryDataSourceStorage.clear();
- List<DataSourceDescription> seps = sesameStorage.getAllDataSources();
- seps.forEach(sep ->
- sep.getSpDataStreams().forEach(es ->
- es.getEventSchema()
+ private void initializeDataStreamStorage() {
+ inMemoryDataStreamStorage.clear();
+ List<SpDataStream> streams = sesameStorage.getAllDataStreams();
+ streams.forEach(stream ->
+ stream.getEventSchema()
.getEventProperties()
- .sort(Comparator.comparingInt(EventProperty::getIndex))));
- seps.forEach(sep -> inMemoryDataSourceStorage.put(sep.getElementId(), sep));
- seps.forEach(sep -> sep.getSpDataStreams().forEach(eventStream -> inMemoryEventStreamStorage.put(eventStream.getElementId(),
- eventStream)));
+ .sort(Comparator.comparingInt(EventProperty::getIndex)));
+ streams.forEach(stream -> inMemoryDataStreamStorage.put(stream.getElementId(), stream));
}
private <T extends ConsumableStreamPipesEntity> List<T> sort(List<T> processingElements) {
@@ -110,13 +100,13 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
}
@Override
- public boolean storeDataSource(DataSourceDescription sep) {
- return sesameStorage.storeDataSource(sep);
+ public boolean storeDataStream(SpDataStream stream) {
+ return sesameStorage.storeDataStream(stream);
}
@Override
- public boolean storeDataSource(String jsonld) {
- return sesameStorage.storeDataSource(jsonld);
+ public boolean storeDataStream(String jsonld) {
+ return sesameStorage.storeDataStream(jsonld);
}
@Override
@@ -126,22 +116,22 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
@Override
public boolean storeDataProcessor(String jsonld) {
- return sesameStorage.storeDataSource(jsonld);
+ return sesameStorage.storeDataProcessor(jsonld);
}
@Override
- public DataSourceDescription getDataSourceById(URI rdfId) {
- return new DataSourceDescription(inMemoryDataSourceStorage.get(rdfId.toString()));
+ public SpDataStream getDataStreamById(URI rdfId) {
+ return new Cloner().mapSequence(inMemoryDataStreamStorage.get(rdfId.toString()));
}
@Override
- public DataSourceDescription getDataSourceByAppId(String appId) {
- return new DataSourceDescription(getByAppId(inMemoryDataSourceStorage, appId));
+ public SpDataStream getDataStreamByAppId(String appId) {
+ return new Cloner().mapSequence(getByAppId(inMemoryDataStreamStorage, appId));
}
@Override
- public DataSourceDescription getDataSourceById(String rdfId) {
- return new DataSourceDescription(inMemoryDataSourceStorage.get(rdfId));
+ public SpDataStream getDataStreamById(String rdfId) {
+ return new Cloner().mapSequence(inMemoryDataStreamStorage.get(rdfId));
}
@Override
@@ -186,8 +176,8 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
}
@Override
- public List<DataSourceDescription> getAllDataSources() {
- return new ArrayList<>(inMemoryDataSourceStorage.values());
+ public List<SpDataStream> getAllDataStreams() {
+ return new ArrayList<>(inMemoryDataStreamStorage.values());
}
@Override
@@ -196,43 +186,43 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
}
@Override
- public boolean deleteDataSource(DataSourceDescription sep) {
- boolean success = sesameStorage.deleteDataSource(sep);
- initializeSEPStorage();
+ public boolean deleteDataStream(SpDataStream sep) {
+ boolean success = sesameStorage.deleteDataStream(sep);
+ initializeDataStreamStorage();
return success;
}
@Override
- public boolean deleteDataSource(String rdfId) {
- boolean success = sesameStorage.deleteDataSource(rdfId);
- initializeSEPStorage();
+ public boolean deleteDataStream(String rdfId) {
+ boolean success = sesameStorage.deleteDataStream(rdfId);
+ initializeDataStreamStorage();
return success;
}
@Override
public boolean deleteDataSink(String rdfId) {
boolean success = sesameStorage.deleteDataSink(rdfId);
- initializeSECStorage();
+ initializeDataSinkStorage();
return success;
}
@Override
public boolean deleteDataProcessor(DataProcessorDescription sepa) {
boolean success = sesameStorage.deleteDataProcessor(sepa);
- initializeSEPAStorage();
+ initializeDataProcessorStorage();
return success;
}
@Override
public boolean deleteDataProcessor(String rdfId) {
- boolean success = sesameStorage.deleteDataSource(rdfId);
- initializeSEPAStorage();
+ boolean success = sesameStorage.deleteDataProcessor(rdfId);
+ initializeDataProcessorStorage();
return success;
}
@Override
- public boolean exists(DataSourceDescription sep) {
- return inMemoryDataSourceStorage.containsKey(sep.getElementId());
+ public boolean exists(SpDataStream stream) {
+ return inMemoryDataStreamStorage.containsKey(stream.getElementId());
}
@Override
@@ -246,8 +236,8 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
}
@Override
- public boolean existsDataSource(String elementId) {
- return inMemoryDataSourceStorage.containsKey(elementId);
+ public boolean existsDataStream(String elementId) {
+ return inMemoryDataStreamStorage.containsKey(elementId);
}
@Override
@@ -256,16 +246,16 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
}
@Override
- public boolean update(DataSourceDescription sep) {
+ public boolean update(SpDataStream sep) {
boolean success = sesameStorage.update(sep);
- initializeSEPStorage();
+ initializeDataStreamStorage();
return success;
}
@Override
public boolean update(DataProcessorDescription sepa) {
boolean success = sesameStorage.update(sepa);
- initializeSEPAStorage();
+ initializeDataProcessorStorage();
return success;
}
@@ -277,14 +267,14 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
@Override
public boolean update(DataSinkDescription sec) {
boolean success = sesameStorage.update(sec);
- initializeSECStorage();
+ initializeDataSinkStorage();
return success;
}
@Override
public boolean deleteDataSink(DataSinkDescription sec) {
boolean success = sesameStorage.deleteDataSink(sec);
- initializeSECStorage();
+ initializeDataSinkStorage();
return success;
}
@@ -305,21 +295,21 @@ public class PipelineElementInMemoryStorage implements IPipelineElementDescripti
@Override
public SpDataStream getEventStreamById(String rdfId) {
- return inMemoryEventStreamStorage.get(rdfId);
+ return inMemoryDataStreamStorage.get(rdfId);
}
@Override
public void refreshDataProcessorCache() {
- this.initializeSEPAStorage();
+ this.initializeDataProcessorStorage();
}
@Override
public void refreshDataSinkCache() {
- this.initializeSECStorage();
+ this.initializeDataSinkStorage();
}
@Override
public void refreshDataSourceCache() {
- this.initializeSEPStorage();
+ this.initializeDataStreamStorage();
}
}
diff --git a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementStorageRequests.java b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementStorageRequests.java
index c1c4ee8..ca134bf 100644
--- a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementStorageRequests.java
+++ b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/impl/PipelineElementStorageRequests.java
@@ -19,12 +19,12 @@
package org.apache.streampipes.storage.rdf4j.impl;
import io.fogsy.empire.core.empire.impl.RdfQuery;
+import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.rdf4j.sparql.QueryBuilder;
@@ -50,7 +50,7 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
//TODO: exception handling
@Override
- public boolean storeDataSource(DataSourceDescription sep) {
+ public boolean storeDataStream(SpDataStream sep) {
if (exists(sep)) {
return false;
}
@@ -59,11 +59,11 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public boolean storeDataSource(String jsonld) {
- DataSourceDescription sep;
+ public boolean storeDataStream(String jsonld) {
+ SpDataStream sep;
try {
- sep = Transformer.fromJsonLd(DataSourceDescription.class, jsonld);
- return storeDataSource(sep);
+ sep = Transformer.fromJsonLd(SpDataStream.class, jsonld);
+ return storeDataStream(sep);
} catch (RDFParseException | IOException | RepositoryException | UnsupportedRDFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -86,8 +86,8 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public boolean existsDataSource(String rdfId) {
- return entityManager.find(DataSourceDescription.class, rdfId) != null;
+ public boolean existsDataStream(String rdfId) {
+ return entityManager.find(SpDataStream.class, rdfId) != null;
}
@Override
@@ -109,25 +109,33 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public DataSourceDescription getDataSourceById(URI rdfId) {
- return entityManager.find(DataSourceDescription.class, rdfId);
+ public SpDataStream getDataStreamById(URI rdfId) {
+ return entityManager.find(SpDataStream.class, rdfId);
}
@Override
- public DataSourceDescription getDataSourceByAppId(String appId) {
- return getByAppId(getAllDataSources(), appId);
+ public SpDataStream getDataStreamByAppId(String appId) {
+ return getByAppId(getAllDataStreams(), appId);
}
@Override
- public DataSourceDescription getDataSourceById(String rdfId) {
- return getDataSourceById(URI.create(rdfId));
+ public SpDataStream getDataStreamById(String rdfId) {
+ return getDataStreamById(URI.create(rdfId));
}
@SuppressWarnings("unchecked")
@Override
- public List<DataSourceDescription> getAllDataSources() {
- Query query = entityManager.createQuery(QueryBuilder.buildListSEPQuery());
- query.setHint(RdfQuery.HINT_ENTITY_CLASS, DataSourceDescription.class);
+ public List<SpDataStream> getAllDataStreams() {
+ Query query = entityManager.createQuery(QueryBuilder.buildListDataStreamQuery());
+ query.setHint(RdfQuery.HINT_ENTITY_CLASS, SpDataStream.class);
+ List<SpDataStream> allStreams = query.getResultList();
+ allStreams.addAll(getAllDataSets());
+ return allStreams;
+ }
+
+ private List<SpDataSet> getAllDataSets() {
+ Query query = entityManager.createQuery(QueryBuilder.buildListDataSetQuery());
+ query.setHint(RdfQuery.HINT_ENTITY_CLASS, SpDataSet.class);
return query.getResultList();
}
@@ -140,14 +148,14 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public boolean deleteDataSource(DataSourceDescription sep) {
- deleteDataSource(sep.getElementId());
+ public boolean deleteDataStream(SpDataStream stream) {
+ deleteDataStream(stream.getElementId());
return true;
}
@Override
- public boolean deleteDataSource(String rdfId) {
- DataSourceDescription sep = entityManager.find(DataSourceDescription.class, rdfId);
+ public boolean deleteDataStream(String rdfId) {
+ SpDataStream sep = entityManager.find(SpDataStream.class, rdfId);
entityManager.remove(sep);
return true;
}
@@ -166,8 +174,8 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public boolean exists(DataSourceDescription sep) {
- DataSourceDescription storedSEP = entityManager.find(DataSourceDescription.class, sep.getElementId());
+ public boolean exists(SpDataStream sep) {
+ SpDataStream storedSEP = entityManager.find(SpDataStream.class, sep.getElementId());
return storedSEP != null;
}
@@ -178,8 +186,8 @@ public class PipelineElementStorageRequests implements IPipelineElementDescripti
}
@Override
- public boolean update(DataSourceDescription sep) {
- return deleteDataSource(sep) && storeDataSource(sep);
+ public boolean update(SpDataStream sep) {
+ return deleteDataStream(sep) && storeDataStream(sep);
}
@Override
diff --git a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/sparql/QueryBuilder.java b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/sparql/QueryBuilder.java
index 0421ea4..7960ad3 100644
--- a/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/sparql/QueryBuilder.java
+++ b/streampipes-storage-rdf4j/src/main/java/org/apache/streampipes/storage/rdf4j/sparql/QueryBuilder.java
@@ -51,11 +51,12 @@ public class QueryBuilder {
public static final String SO_DOMAIN_INCLUDES = "http://schema.org/domainIncludes";
public static final String SO_RANGE_INCLUDES = "http://schema.org/rangeIncludes";
- public static String buildListSEPQuery() {
- StringBuilder builder = new StringBuilder();
- builder.append("where { ?result rdf:type sp:DataSourceDescription. }");
+ public static String buildListDataStreamQuery() {
+ return "where { ?result rdf:type sp:DataStream. }";
+ }
- return builder.toString();
+ public static String buildListDataSetQuery() {
+ return "where { ?result rdf:type sp:DataSet. }";
}
private static String getPrefix() {
@@ -81,22 +82,6 @@ public class QueryBuilder {
return builder.toString();
}
- public static String buildSEPByDomainQuery(String domain) {
- StringBuilder builder = new StringBuilder();
- builder.append("where { ?result rdf:type sp:DataSourceDescription. ?result sp:hasDomain '"
- + domain + "'^^xsd:string }");
-
- return builder.toString();
- }
-
- public static String buildSEPAByDomainQuery(String domain) {
- StringBuilder builder = new StringBuilder();
- builder.append("where { ?result rdf:type sp:DataProcessorDescription. ?result sp:hasDomain '"
- + domain + "'^^xsd:string }");
-
- return builder.toString();
- }
-
public static String buildListSEPAQuery() {
StringBuilder builder = new StringBuilder();
builder.append("where { ?result rdf:type sp:DataProcessorDescription. }");
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 171a058..d269543 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -402,5 +402,7 @@ public class StreamPipes {
public static final String USER_DEFINED_OUTPUT_STRATEGY = NS + "UserDefinedOutputStrategy";
public static final String PE_CONFIGURED = NS + "isPeConfigured" ;
- public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes" ;
+ public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes";
+ public static final String IS_INTERNALLY_MANAGED = NS + "isInternallyManaged";
+ public static final String HAS_CORRESPONDING_ADAPTER_ID = NS + "hasCorrespondingAdapterId";
}
diff --git a/ui/src/app/add/add.component.html b/ui/src/app/add/add.component.html
index bb37be7..15114d2 100644
--- a/ui/src/app/add/add.component.html
+++ b/ui/src/app/add/add.component.html
@@ -27,7 +27,8 @@
<mat-tab-group [selectedIndex]="selectedCategoryIndex"
(selectedIndexChange)="setSelectedTab($event)">
<mat-tab label="All"></mat-tab>
- <mat-tab label="Data Sources"></mat-tab>
+ <mat-tab label="Data Sets"></mat-tab>
+ <mat-tab label="Data Streams"></mat-tab>
<mat-tab label="Data Processors"></mat-tab>
<mat-tab label="Data Sinks"></mat-tab>
</mat-tab-group>
diff --git a/ui/src/app/add/add.component.ts b/ui/src/app/add/add.component.ts
index 5214213..38fca8d 100644
--- a/ui/src/app/add/add.component.ts
+++ b/ui/src/app/add/add.component.ts
@@ -37,7 +37,7 @@ export class AddComponent implements OnInit {
endpointItems: any[];
endpointItemsLoadingComplete: boolean;
selectedTab: string;
- availableTypes: Array<string> = ["all", "source", "sepa", "action"];
+ availableTypes: Array<string> = ["all", "set", "stream", "sepa", "action"];
selectedCategoryIndex: number = 0;
@@ -192,4 +192,4 @@ export class AddComponent implements OnInit {
get selectedInstallationStatus(): string {
return this._selectedInstallationStatus;
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/add/components/endpoint-item/endpoint-item.component.html b/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
index 33897dd..2689d12 100644
--- a/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
+++ b/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
@@ -26,17 +26,19 @@
<div fxFlex fxLayoutAlign="end start">
<span class="{{itemTypeStyle}}">{{itemTypeTitle}}</span>
</div>
+ <div class="ml-5"><small *ngIf="!(item.editable)">Internally managed by StreamPipes</small></div>
<div fxFlex fxLayoutAlign="start end" class="ml-5">
- <button class="small-button-add" mat-button mat-raised-button color="primary"
+ <button class="small-button-add" mat-button mat-raised-button color="primary" [disabled]="!(item.editable)"
(click)="installSingleElement($event, item)" *ngIf="!item.installed">
<span> Install</span>
</button>
- <button class="small-button-add-inverted" mat-button mat-raised-button
+ <button class="small-button-add-inverted" mat-button mat-raised-button [disabled]="!(item.editable)"
(click)="uninstallSingleElement($event, item)" *ngIf="item.installed">
<span> Uninstall</span>
</button>
<div *ngIf="item.installed" style="margin-left:5px;">
- <button class="small-button-add mat-basic no-min-width" mat-raised-button mat-button [matMenuTriggerFor]="menu"><span style="font-size:12px;">...</span></button>
+ <button class="small-button-add mat-basic no-min-width" [disabled]="!(item.editable)"
+ mat-raised-button mat-button [matMenuTriggerFor]="menu"><span style="font-size:12px;">...</span></button>
<mat-menu #menu="matMenu">
<button mat-menu-item (click)="refresh(item.uri)">
<mat-icon>refresh</mat-icon>
@@ -52,20 +54,7 @@
<div fxFlex fxLayout="column">
<h3><b>{{item.name}}</b></h3>
<p>{{item.description}}</p>
- <div fxFlex fxLayout="column" *ngIf="item.type=='source'">
- <div fxLayout="row" fxLayoutAlign="start center" *ngFor="let stream of item.streams">
- <div style="min-height:60px;">
- <div class="draggable-icon-preview stream" style="margin-right:10px;">
- {{iconText(stream.name)}}
- </div>
- </div>
- <div style="min-height:60px;margin-left:10px;">
- <h5><b>{{stream.name}}</b></h5>
- <p>{{stream.description}}</p>
- </div>
- </div>
- </div>
</div>
</div>
</div>
-</div>
\ No newline at end of file
+</div>
diff --git a/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts b/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
index e6c1f92..b20d25a 100644
--- a/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
+++ b/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
@@ -70,8 +70,10 @@ export class EndpointItemComponent implements OnInit {
}
findItemTypeTitle() {
- if (this.item.type === 'source') {
- this.itemTypeTitle = "Data Source";
+ if (this.item.type === 'stream') {
+ this.itemTypeTitle = "Data Stream";
+ } else if (this.item.type === 'set') {
+ this.itemTypeTitle = "Data Set";
} else if (this.item.type === 'sepa') {
this.itemTypeTitle = "Data Processor";
} else {
@@ -81,8 +83,10 @@ export class EndpointItemComponent implements OnInit {
findItemStyle() {
let baseType = "pe-label ";
- if (this.item.type == 'source') {
- this.itemTypeStyle = baseType + "source-label";
+ if (this.item.type == 'stream') {
+ this.itemTypeStyle = baseType + "stream-label";
+ } else if (this.item.type === 'set') {
+ this.itemTypeStyle = baseType + "set-label";
} else if (this.item.type == 'sepa') {
this.itemTypeStyle = baseType + "processor-label";
} else {
@@ -116,4 +120,4 @@ export class EndpointItemComponent implements OnInit {
//this.loadCurrentElements(type);
});
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/connect/services/rest.service.ts b/ui/src/app/connect/services/rest.service.ts
index 6b2fdf6..55a5a94 100644
--- a/ui/src/app/connect/services/rest.service.ts
+++ b/ui/src/app/connect/services/rest.service.ts
@@ -81,7 +81,7 @@ export class RestService {
getSourceDetails(sourceElementId): Observable<DataSourceDescription> {
return this.http
- .get(this.makeUserDependentBaseUrl() + '/sources/' + encodeURIComponent(sourceElementId)).pipe(map(response => {
+ .get(this.makeUserDependentBaseUrl() + '/streams/' + encodeURIComponent(sourceElementId)).pipe(map(response => {
return DataSourceDescription.fromData(response as DataSourceDescription);
}));
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 712f6d4..de8b166 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,11 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-11-25 23:55:30.
+// Generated using typescript-generator version 2.27.744 on 2021-01-02 10:53:55.
export class AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+ elementId: string;
static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
if (!data) {
@@ -30,32 +31,13 @@ export class AbstractStreamPipesEntity {
}
const instance = target || new AbstractStreamPipesEntity();
instance["@class"] = data["@class"];
- return instance;
- }
-}
-
-export class AccessibleSensorActuatorResource {
- connectionInfo: string;
- connectionType: string;
- name: string;
- type: string;
-
- static fromData(data: AccessibleSensorActuatorResource, target?: AccessibleSensorActuatorResource): AccessibleSensorActuatorResource {
- if (!data) {
- return data;
- }
- const instance = target || new AccessibleSensorActuatorResource();
- instance.name = data.name;
- instance.type = data.type;
- instance.connectionInfo = data.connectionInfo;
- instance.connectionType = data.connectionType;
+ instance.elementId = data.elementId;
return instance;
}
}
export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
- elementId: string;
+ "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
if (!data) {
@@ -63,7 +45,6 @@ export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
}
const instance = target || new UnnamedStreamPipesEntity();
super.fromData(data, instance);
- instance.elementId = data.elementId;
return instance;
}
}
@@ -144,12 +125,12 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
connectedTo: string[];
description: string;
dom: string;
- elementId: string;
iconUrl: string;
includedAssets: string[];
includedLocales: string[];
includesAssets: boolean;
includesLocales: boolean;
+ internallyManaged: boolean;
name: string;
uri: string;
@@ -162,16 +143,16 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.name = data.name;
instance.description = data.description;
instance.iconUrl = data.iconUrl;
- instance.elementId = data.elementId;
instance.appId = data.appId;
instance.includesAssets = data.includesAssets;
instance.includesLocales = data.includesLocales;
instance.includedAssets = __getCopyArrayFn(__identity<string>())(data.includedAssets);
instance.includedLocales = __getCopyArrayFn(__identity<string>())(data.includedLocales);
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
+ instance.internallyManaged = data.internallyManaged;
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.uri = data.uri;
instance.dom = data.dom;
+ instance.uri = data.uri;
return instance;
}
}
@@ -206,9 +187,9 @@ export class AdapterDescription extends NamedStreamPipesEntity {
instance.config = __getCopyArrayFn(StaticProperty.fromDataUnion)(data.config);
instance.rules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.rules);
instance.category = __getCopyArrayFn(__identity<string>())(data.category);
- instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
instance.schemaRules = __getCopyArrayFn(__identity<any>())(data.schemaRules);
+ instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
instance.couchDBId = data.couchDBId;
instance._rev = data._rev;
return instance;
@@ -615,26 +596,12 @@ export class ApplicationLink extends UnnamedStreamPipesEntity {
}
}
-export class CPU {
- arch: string;
- cores: number;
-
- static fromData(data: CPU, target?: CPU): CPU {
- if (!data) {
- return data;
- }
- const instance = target || new CPU();
- instance.cores = data.cores;
- instance.arch = data.arch;
- return instance;
- }
-}
-
export class Category {
_id: string;
_rev: string;
internalName: string;
name: string;
+ superLabel: string;
superLabelId: string;
static fromData(data: Category, target?: Category): Category {
@@ -647,6 +614,7 @@ export class Category {
instance.superLabelId = data.superLabelId;
instance._id = data._id;
instance._rev = data._rev;
+ instance.superLabel = data.superLabel;
return instance;
}
}
@@ -751,21 +719,6 @@ export class CreateNestedRuleDescription extends SchemaTransformationRuleDescrip
}
}
-export class Cuda {
- cudaDriverVersion: string;
- cudaRuntimeVersion: string;
-
- static fromData(data: Cuda, target?: Cuda): Cuda {
- if (!data) {
- return data;
- }
- const instance = target || new Cuda();
- instance.cudaDriverVersion = data.cudaDriverVersion;
- instance.cudaRuntimeVersion = data.cudaRuntimeVersion;
- return instance;
- }
-}
-
export class CustomOutputStrategy extends OutputStrategy {
"@class": "org.apache.streampipes.model.output.CustomOutputStrategy";
availablePropertyKeys: string[];
@@ -800,19 +753,6 @@ export class CustomTransformOutputStrategy extends OutputStrategy {
}
}
-export class DISK {
- diskTotal: number;
-
- static fromData(data: DISK, target?: DISK): DISK {
- if (!data) {
- return data;
- }
- const instance = target || new DISK();
- instance.diskTotal = data.diskTotal;
- return instance;
- }
-}
-
export class DashboardEntity extends UnnamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.dashboard.DashboardEntity" | "org.apache.streampipes.model.dashboard.DashboardWidgetModel" | "org.apache.streampipes.model.dashboard.VisualizablePipeline" | "org.apache.streampipes.model.datalake.DataExplorerWidgetModel";
_id: string;
@@ -992,13 +932,6 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
configured: boolean;
correspondingPipeline: string;
correspondingUser: string;
- deploymentRunningInstanceId: string;
- deploymentTargetNodeHostname: string;
- deploymentTargetNodeId: string;
- deploymentTargetNodePort: number;
- elementEndpointHostname: string;
- elementEndpointPort: number;
- elementEndpointServiceName: string;
inputStreams: SpDataStreamUnion[];
staticProperties: StaticPropertyUnion[];
statusInfoSettings: ElementStatusInfoSettings;
@@ -1020,13 +953,6 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
instance.correspondingPipeline = data.correspondingPipeline;
instance.correspondingUser = data.correspondingUser;
instance.streamRequirements = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.streamRequirements);
- instance.elementEndpointHostname = data.elementEndpointHostname;
- instance.elementEndpointPort = data.elementEndpointPort;
- instance.deploymentTargetNodeId = data.deploymentTargetNodeId;
- instance.deploymentTargetNodeHostname = data.deploymentTargetNodeHostname;
- instance.deploymentTargetNodePort = data.deploymentTargetNodePort;
- instance.deploymentRunningInstanceId = data.deploymentRunningInstanceId;
- instance.elementEndpointServiceName = data.elementEndpointServiceName;
instance.configured = data.configured;
instance.uncompleted = data.uncompleted;
return instance;
@@ -1168,25 +1094,6 @@ export class DeleteRuleDescription extends SchemaTransformationRuleDescription {
}
}
-export class Docker {
- apiVersion: string;
- hasDocker: boolean;
- hasNvidiaRuntime: boolean;
- serverVersion: string;
-
- static fromData(data: Docker, target?: Docker): Docker {
- if (!data) {
- return data;
- }
- const instance = target || new Docker();
- instance.hasDocker = data.hasDocker;
- instance.hasNvidiaRuntime = data.hasNvidiaRuntime;
- instance.serverVersion = data.serverVersion;
- instance.apiVersion = data.apiVersion;
- return instance;
- }
-}
-
export class DomainPropertyProbability extends UnnamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.connect.guess.DomainPropertyProbability";
domainProperty: string;
@@ -1675,23 +1582,6 @@ export class Frequency extends EventStreamQualityDefinition {
}
}
-export class GPU {
- cudaCores: number;
- hasGPU: boolean;
- type: string;
-
- static fromData(data: GPU, target?: GPU): GPU {
- if (!data) {
- return data;
- }
- const instance = target || new GPU();
- instance.hasGPU = data.hasGPU;
- instance.cudaCores = data.cudaCores;
- instance.type = data.type;
- return instance;
- }
-}
-
export interface GenericAdapterDescription {
eventSchema: EventSchema;
formatDescription: FormatDescription;
@@ -1754,25 +1644,6 @@ export class GuessSchema extends UnnamedStreamPipesEntity {
}
}
-export class HardwareResource {
- cpu: CPU;
- disk: DISK;
- gpu: GPU;
- memory: MEM;
-
- static fromData(data: HardwareResource, target?: HardwareResource): HardwareResource {
- if (!data) {
- return data;
- }
- const instance = target || new HardwareResource();
- instance.cpu = CPU.fromData(data.cpu);
- instance.memory = MEM.fromData(data.memory);
- instance.disk = DISK.fromData(data.disk);
- instance.gpu = GPU.fromData(data.gpu);
- return instance;
- }
-}
-
export class TransportProtocol extends UnnamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.grounding.TransportProtocol" | "org.apache.streampipes.model.grounding.JmsTransportProtocol" | "org.apache.streampipes.model.grounding.KafkaTransportProtocol" | "org.apache.streampipes.model.grounding.MqttTransportProtocol";
brokerHostname: string;
@@ -1922,19 +1793,6 @@ export class ListOutputStrategy extends OutputStrategy {
}
}
-export class MEM {
- memTotal: number;
-
- static fromData(data: MEM, target?: MEM): MEM {
- if (!data) {
- return data;
- }
- const instance = target || new MEM();
- instance.memTotal = data.memTotal;
- return instance;
- }
-}
-
export class MappingProperty extends StaticProperty {
"@class": "org.apache.streampipes.model.staticproperty.MappingProperty" | "org.apache.streampipes.model.staticproperty.MappingPropertyUnary" | "org.apache.streampipes.model.staticproperty.MappingPropertyNary";
mapsFromOptions: string[];
@@ -2092,93 +1950,6 @@ export class MqttTransportProtocol extends TransportProtocol {
}
}
-export class Node {
- nodeInfo: NodeInfo;
-
- static fromData(data: Node, target?: Node): Node {
- if (!data) {
- return data;
- }
- const instance = target || new Node();
- instance.nodeInfo = NodeInfo.fromData(data.nodeInfo);
- return instance;
- }
-}
-
-export class NodeBrokerInfo {
- host: string;
- port: number;
-
- static fromData(data: NodeBrokerInfo, target?: NodeBrokerInfo): NodeBrokerInfo {
- if (!data) {
- return data;
- }
- const instance = target || new NodeBrokerInfo();
- instance.host = data.host;
- instance.port = data.port;
- return instance;
- }
-}
-
-export class NodeInfo {
- nodeBrokerInfo: NodeBrokerInfo;
- nodeControllerId: string;
- nodeControllerPort: number;
- nodeMetadata: NodeMetadata;
- nodeResources: NodeResources;
- supportedPipelineElementAppIds: string[];
-
- static fromData(data: NodeInfo, target?: NodeInfo): NodeInfo {
- if (!data) {
- return data;
- }
- const instance = target || new NodeInfo();
- instance.nodeControllerId = data.nodeControllerId;
- instance.nodeControllerPort = data.nodeControllerPort;
- instance.nodeMetadata = NodeMetadata.fromData(data.nodeMetadata);
- instance.nodeBrokerInfo = NodeBrokerInfo.fromData(data.nodeBrokerInfo);
- instance.nodeResources = NodeResources.fromData(data.nodeResources);
- instance.supportedPipelineElementAppIds = __getCopyArrayFn(__identity<string>())(data.supportedPipelineElementAppIds);
- return instance;
- }
-}
-
-export class NodeMetadata {
- nodeAddress: string;
- nodeLocationTags: string[];
- nodeModel: string;
- nodeType: string;
-
- static fromData(data: NodeMetadata, target?: NodeMetadata): NodeMetadata {
- if (!data) {
- return data;
- }
- const instance = target || new NodeMetadata();
- instance.nodeAddress = data.nodeAddress;
- instance.nodeModel = data.nodeModel;
- instance.nodeType = data.nodeType;
- instance.nodeLocationTags = __getCopyArrayFn(__identity<string>())(data.nodeLocationTags);
- return instance;
- }
-}
-
-export class NodeResources {
- accessibleSensorActuatorResource: AccessibleSensorActuatorResource[];
- hardwareResource: HardwareResource;
- softwareResource: SoftwareResource;
-
- static fromData(data: NodeResources, target?: NodeResources): NodeResources {
- if (!data) {
- return data;
- }
- const instance = target || new NodeResources();
- instance.hardwareResource = HardwareResource.fromData(data.hardwareResource);
- instance.softwareResource = SoftwareResource.fromData(data.softwareResource);
- instance.accessibleSensorActuatorResource = __getCopyArrayFn(AccessibleSensorActuatorResource.fromData)(data.accessibleSensorActuatorResource);
- return instance;
- }
-}
-
export class Notification {
additionalInformation: string;
description: string;
@@ -2246,6 +2017,7 @@ export class Pipeline extends ElementComposition {
createdByUser: string;
pipelineCategories: string[];
publicElement: boolean;
+ restartOnSystemReboot: boolean;
running: boolean;
startedAt: number;
@@ -2257,6 +2029,7 @@ export class Pipeline extends ElementComposition {
super.fromData(data, instance);
instance.actions = __getCopyArrayFn(DataSinkInvocation.fromData)(data.actions);
instance.running = data.running;
+ instance.restartOnSystemReboot = data.restartOnSystemReboot;
instance.startedAt = data.startedAt;
instance.createdAt = data.createdAt;
instance.publicElement = data.publicElement;
@@ -2287,31 +2060,6 @@ export class PipelineCategory {
}
}
-export class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
- "@class": "org.apache.streampipes.model.node.PipelineElementDockerContainer";
- containerName: string;
- containerPorts: string[];
- envVars: string[];
- imageURI: string;
- labels: { [index: string]: string };
- serviceId: string;
-
- static fromData(data: PipelineElementDockerContainer, target?: PipelineElementDockerContainer): PipelineElementDockerContainer {
- if (!data) {
- return data;
- }
- const instance = target || new PipelineElementDockerContainer();
- super.fromData(data, instance);
- instance.imageURI = data.imageURI;
- instance.containerName = data.containerName;
- instance.serviceId = data.serviceId;
- instance.containerPorts = __getCopyArrayFn(__identity<string>())(data.containerPorts);
- instance.envVars = __getCopyArrayFn(__identity<string>())(data.envVars);
- instance.labels = __getCopyObjectFn(__identity<string>())(data.labels);
- return instance;
- }
-}
-
export class PipelineElementRecommendation {
count: number;
description: string;
@@ -2720,26 +2468,10 @@ export class SimpleTopicDefinition extends TopicDefinition {
}
}
-export class SoftwareResource {
- docker: Docker;
- kernelVersion: string;
- os: string;
-
- static fromData(data: SoftwareResource, target?: SoftwareResource): SoftwareResource {
- if (!data) {
- return data;
- }
- const instance = target || new SoftwareResource();
- instance.os = data.os;
- instance.kernelVersion = data.kernelVersion;
- instance.docker = Docker.fromData(data.docker);
- return instance;
- }
-}
-
export class SpDataStream extends NamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.SpDataStream" | "org.apache.streampipes.model.SpDataSet";
category: string[];
+ correspondingAdapterId: string;
eventGrounding: EventGrounding;
eventSchema: EventSchema;
hasEventStreamQualities: EventStreamQualityDefinitionUnion[];
@@ -2761,6 +2493,7 @@ export class SpDataStream extends NamedStreamPipesEntity {
instance.measurementCapability = __getCopyArrayFn(MeasurementCapability.fromData)(data.measurementCapability);
instance.measurementObject = __getCopyArrayFn(MeasurementObject.fromData)(data.measurementObject);
instance.index = data.index;
+ instance.correspondingAdapterId = data.correspondingAdapterId;
instance.category = __getCopyArrayFn(__identity<string>())(data.category);
return instance;
}
diff --git a/ui/src/app/editor/editor.component.ts b/ui/src/app/editor/editor.component.ts
index 412043a..7d44d3f 100644
--- a/ui/src/app/editor/editor.component.ts
+++ b/ui/src/app/editor/editor.component.ts
@@ -117,10 +117,10 @@ export class EditorComponent implements OnInit {
this.allElements = this.allElements.concat(processors);
this.afterPipelineElementLoaded(0);
});
- this.pipelineElementService.getDataSources().subscribe(sources => {
- let allStreams = this.collectStreams(sources);
- this.availableDataStreams = allStreams.filter(s => !(s instanceof SpDataSet));
- this.availableDataSets = allStreams
+ this.pipelineElementService.getDataStreams().subscribe(streams => {
+ //let allStreams = this.collectStreams(sources);
+ this.availableDataStreams = streams.filter(s => !(s instanceof SpDataSet));
+ this.availableDataSets = streams
.filter(s => s instanceof SpDataSet)
.map(s => s as SpDataSet);
this.allElements = this.allElements.concat(this.availableDataStreams);
diff --git a/ui/src/app/home/components/status.component.ts b/ui/src/app/home/components/status.component.ts
index 48261a0..c10ee51 100644
--- a/ui/src/app/home/components/status.component.ts
+++ b/ui/src/app/home/components/status.component.ts
@@ -20,6 +20,7 @@ import {Component} from "@angular/core";
import {RestApi} from "../../services/rest-api.service";
import {Router} from "@angular/router";
import {NotificationCountService} from "../../services/notification-count-service";
+import {PipelineElementService} from "../../platform-services/apis/pipeline-element.service";
@Component({
selector: 'status',
@@ -32,7 +33,8 @@ export class StatusComponent {
runningPipelines: number = 0;
installedPipelineElements: number = 0;
- constructor(private RestApi: RestApi,
+ constructor(private pipelineElementService: PipelineElementService,
+ private RestApi: RestApi,
private Router: Router,
public NotificationCountService: NotificationCountService) {
@@ -53,12 +55,9 @@ export class StatusComponent {
}
getStreams() {
- this.RestApi.getOwnSources()
- .subscribe((sources) => {
- sources.forEach((source, i, sources) => {
- this.installedPipelineElements += source.spDataStreams.length;
- });
- });
+ this.pipelineElementService.getDataStreams().subscribe(streams => {
+ this.addPipelineElementList(streams);
+ });
};
getProcessors() {
@@ -82,4 +81,4 @@ export class StatusComponent {
navigate(url: string) {
this.Router.navigate([url]);
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/platform-services/apis/pipeline-element.service.ts b/ui/src/app/platform-services/apis/pipeline-element.service.ts
index 6c16d14..3398059 100644
--- a/ui/src/app/platform-services/apis/pipeline-element.service.ts
+++ b/ui/src/app/platform-services/apis/pipeline-element.service.ts
@@ -22,7 +22,7 @@ import {Observable} from "rxjs";
import {
DataProcessorInvocation,
DataSinkInvocation,
- DataSourceDescription
+ DataSourceDescription, SpDataSet, SpDataStream
} from "../../core-model/gen/streampipes-model";
import {PlatformServicesCommons} from "./commons.service";
import {map} from "rxjs/operators";
@@ -46,9 +46,15 @@ export class PipelineElementService {
}));
}
- getDataSources(): Observable<Array<DataSourceDescription>> {
- return this.http.get(this.dataSourcesUrl + "/own").pipe(map(data => {
- return (data as []).map(dpi => DataSourceDescription.fromData(dpi));
+ getDataStreams(): Observable<Array<SpDataStream>> {
+ return this.http.get(this.dataStreamsUrl + "/own").pipe(map(data => {
+ return (data as []).map(dpi => {
+ if (dpi["@class"] == "org.apache.streampipes.model.SpDataSet") {
+ return SpDataSet.fromData(dpi)
+ } else {
+ return SpDataStream.fromData(dpi);
+ }
+ });
}));
}
@@ -63,12 +69,12 @@ export class PipelineElementService {
return this.platformServicesCommons.authUserBasePath() + '/sepas'
}
- private get dataSourcesUrl(): string {
- return this.platformServicesCommons.authUserBasePath() + '/sources'
+ private get dataStreamsUrl(): string {
+ return this.platformServicesCommons.authUserBasePath() + '/streams'
}
private get dataSinksUrl(): string {
return this.platformServicesCommons.authUserBasePath() + '/actions'
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/scss/sp/main.scss b/ui/src/scss/sp/main.scss
index b86fcfc..7ca422a 100644
--- a/ui/src/scss/sp/main.scss
+++ b/ui/src/scss/sp/main.scss
@@ -798,11 +798,16 @@ md-select.md-default-theme .md-select-value.md-select-placeholder, md-select .md
padding: 2px 5px;
}
-.source-label {
+.stream-label {
background: $sp-color-stream;
color: black;
}
+.set-label {
+ background: $sp-color-set;
+ color: black;
+}
+
.processor-label {
background: $sp-color-processor;
}