You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/21 20:47:47 UTC
[incubator-streampipes] branch STREAMPIPES-319 updated:
[STREAMPIPES-319] Improve service discovery for Connect worker and
pipelines
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-319 by this push:
new adefce4 [STREAMPIPES-319] Improve service discovery for Connect worker and pipelines
adefce4 is described below
commit adefce45d3343c2502db56f10069b71d34845732
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Jun 21 22:47:29 2021 +0200
[STREAMPIPES-319] Improve service discovery for Connect worker and pipelines
---
streampipes-backend/pom.xml | 4 +
.../backend/StreamPipesBackendApplication.java | 17 ++--
.../backend/StreamPipesResourceConfig.java | 2 +-
.../NoServiceEndpointsAvailableException.java | 9 +-
.../master/management/AdapterMasterManagement.java | 44 ++++-----
.../master/management/DescriptionManagement.java | 23 ++---
.../master/management/GuessManagement.java | 21 ++--
.../master/management/SourcesManagement.java | 14 ++-
.../connect/container/master/management/Utils.java | 58 -----------
.../management/WorkerAdministrationManagement.java | 32 +++---
.../master/management/WorkerRestClient.java | 77 +++++----------
.../master/management/WorkerUrlProvider.java | 81 +++++++++++++++
.../container/master/rest/AdapterResource.java | 32 +++---
.../container/master/rest/DescriptionResource.java | 41 ++++----
.../master/rest/RuntimeResolvableResource.java | 16 +--
.../container/master/rest/SourcesResource.java | 5 +-
.../master/rest/WorkerAdministrationResource.java | 2 +-
.../connect/container/master/util/Utils.java | 15 ++-
.../connect/container/master/util/WorkerPaths.java | 68 +++++++++++++
.../master/management/WorkerRestClientTest.java | 4 +-
.../init/AdapterServiceResourceProvider.java | 2 +-
.../worker/init/AdapterWorkerContainer.java | 7 +-
.../init/ConnectWorkerDescriptionProvider.java | 19 +---
.../init/ConnectWorkerRegistrationService.java | 9 +-
.../worker/init/ConnectWorkerTagProvider.java | 3 +-
.../container/worker/rest/AdapterResource.java | 2 +-
.../container/worker/rest/GuessResource.java | 2 +-
.../worker/rest/HttpServerAdapterResource.java | 2 +-
.../container/worker/rest/ProtocolResource.java | 2 +-
.../worker/rest/RuntimeResolvableResource.java | 2 +-
.../container/worker/rest/WelcomePageWorker.java | 2 +-
.../container/worker/rest/WorkerResource.java | 2 +-
.../container/base/StreamPipesServiceBase.java | 19 +++-
.../extensions/ExtensionsModelSubmitter.java | 2 +-
.../init/PipelineElementServiceTagProvider.java | 3 +-
.../api/DataProcessorPipelineElementResource.java | 4 +-
.../api/DataSinkPipelineElementResource.java | 4 +-
.../api/DataStreamPipelineElementResource.java | 4 +-
.../container/html/page/WelcomePageGenerator.java | 7 +-
...ndpoint.java => ExtensionsServiceEndpoint.java} | 6 +-
...tem.java => ExtensionsServiceEndpointItem.java} | 10 +-
.../model/connect/adapter/AdapterDescription.java | 11 +++
.../connect/worker/ConnectWorkerContainer.java | 29 +++---
.../model/dashboard/DashboardEntity.java | 3 +-
.../streampipes/manager/assets/AssetFetcher.java | 15 +--
.../streampipes/manager/assets/AssetManager.java | 9 +-
.../manager/endpoint/EndpointFetcher.java | 22 ++---
.../manager/endpoint/EndpointItemFetcher.java | 20 ++--
.../http/ExtensionsServiceEndpointGenerator.java | 65 ++++++++++++
.../http/PipelineElementEndpointGenerator.java | 53 ----------
.../manager/execution/http/PipelineExecutor.java | 63 ++++++++----
.../streampipes/manager/operations/Operations.java | 6 +-
.../manager/setup/CouchDbInstallationStep.java | 8 +-
.../manager/setup/InstallationConfiguration.java | 4 +-
.../setup/PipelineElementInstallationStep.java | 12 +--
.../verification/DataProcessorVerifier.java | 9 +-
.../manager/verification/DataSinkVerifier.java | 9 +-
.../manager/verification/DataStreamVerifier.java | 9 +-
.../manager/verification/ElementVerifier.java | 9 +-
...java => ExtensionsServiceEndpointResource.java} | 39 ++++----
.../svcdiscovery/api/ISpServiceDiscovery.java | 16 ++-
.../api/model/DefaultSpServiceGroups.java | 1 +
.../api/model/DefaultSpServiceTags.java | 10 +-
.../api/model/SpServicePathPrefix.java | 7 +-
.../api/model/SpServiceRegistrationRequest.java | 109 +++++++++++++++++++++
.../svcdiscovery/api/model/SpServiceTagPrefix.java | 3 +-
.../api/model/SpServiceUrlProvider.java | 27 +++--
.../consul/SpConsulServiceDiscovery.java | 67 ++++++-------
.../consul/model/HealthCheckConfiguration.java | 15 ++-
.../base/StreamPipesExtensionsServiceBase.java | 3 +-
...java => IExtensionsServiceEndpointStorage.java} | 10 +-
.../streampipes/storage/api/INoSqlStorage.java | 2 +-
.../storage/couchdb/CouchDbStorageManager.java | 4 +-
...a => ExtensionsServiceEndpointStorageImpl.java} | 37 ++++---
.../connect/services/data-marketplace.service.ts | 6 +-
ui/src/app/core-model/gen/streampipes-model.ts | 26 ++---
76 files changed, 831 insertions(+), 585 deletions(-)
diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml
index e90508f..1185e21 100644
--- a/streampipes-backend/pom.xml
+++ b/streampipes-backend/pom.xml
@@ -69,6 +69,10 @@
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-jaxrs2</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
<!-- Dependency convergence -->
<dependency>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 43ad4a8..99e650e 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -31,7 +31,6 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -45,7 +44,10 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContextListener;
import java.net.UnknownHostException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -237,13 +239,14 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
@Override
protected List<SpServiceTag> getServiceTags() {
return Arrays.asList(
- createSysTag(DefaultSpServiceTags.CORE),
- createSysTag(DefaultSpServiceTags.CONNECT_MASTER),
- createSysTag(DefaultSpServiceTags.STREAMPIPES_CLIENT)
+ DefaultSpServiceTags.CORE,
+ DefaultSpServiceTags.CONNECT_MASTER,
+ DefaultSpServiceTags.STREAMPIPES_CLIENT
);
}
- private SpServiceTag createSysTag(String value) {
- return SpServiceTag.create(SpServiceTagPrefix.SYSTEM, value);
+ @Override
+ protected String getHealthCheckPath() {
+ return "/streampipes-backend";
}
}
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 748a0da..1c9f03b 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -82,7 +82,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
register(PipelineNoUserResource.class);
register(PipelineTemplate.class);
register(PipelineResource.class);
- register(RdfEndpoint.class);
+ register(ExtensionsServiceEndpointResource.class);
register(SemanticEventConsumer.class);
register(SemanticEventProcessingAgent.class);
register(SemanticEventProducer.class);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
similarity index 79%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
index d263f07..13f4e38 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
@@ -15,10 +15,11 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api.model;
+package org.apache.streampipes.commons.exceptions;
-public class DefaultSpServiceGroups {
-
- public static final String CORE = "core";
+public class NoServiceEndpointsAvailableException extends Exception {
+ public NoServiceEndpointsAvailableException(String message) {
+ super(message);
+ }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index c5703fc..faef9ee 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -18,11 +18,12 @@
package org.apache.streampipes.connect.container.master.management;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.connect.adapter.GroundingService;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.Utils;
import org.apache.streampipes.manager.storage.UserService;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.model.SpDataStream;
@@ -50,16 +51,18 @@ public class AdapterMasterManagement {
private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
private IAdapterStorage adapterStorage;
+ private WorkerUrlProvider workerUrlProvider;
public AdapterMasterManagement() {
this.adapterStorage = getAdapterStorage();
+ this.workerUrlProvider = new WorkerUrlProvider();
}
public AdapterMasterManagement(IAdapterStorage adapterStorage) {
this.adapterStorage = adapterStorage;
}
- public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException {
+ public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException, NoServiceEndpointsAvailableException {
IAdapterStorage adapterStorage = getAdapterStorage();
List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
@@ -67,10 +70,10 @@ public class AdapterMasterManagement {
if (ad instanceof AdapterStreamDescription) {
AdapterDescription decryptedAdapterDescription =
new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
- String wUrl = new Utils().getWorkerUrl(decryptedAdapterDescription);
+ String wUrl = workerUrlProvider.getWorkerUrlForAdapter(decryptedAdapterDescription);
- if (wUrl.equals(connectWorkerContainer.getEndpointUrl())) {
- String url = Utils.addUserNameToApi(connectWorkerContainer.getEndpointUrl(),
+ if (wUrl.equals(connectWorkerContainer.getServiceGroup())) {
+ String url = Utils.addUserNameToApi(connectWorkerContainer.getServiceGroup(),
decryptedAdapterDescription.getUserName());
WorkerRestClient.invokeStreamAdapter(url, (AdapterStreamDescription) decryptedAdapterDescription);
@@ -81,7 +84,7 @@ public class AdapterMasterManagement {
}
public String addAdapter(AdapterDescription ad,
- String baseUrl,
+ String endpointUrl,
String username)
throws AdapterException {
@@ -90,12 +93,9 @@ public class AdapterMasterManagement {
ad.setEventGrounding(eventGrounding);
String uuid = UUID.randomUUID().toString();
-
-// String newId = ConnectContainerConfig.INSTANCE.getConnectContainerMasterUrl() + "api/v1/" + username + "/master/sources/" + uuid;
- String newId = BackendConfig.INSTANCE.getBackendApiUrl() + "api/v2/connect/" + username + "/master/sources/" + uuid;
-
- ad.setElementId(newId);
+ ad.setElementId(ad.getElementId() + ":" + uuid);
ad.setCreatedAt(System.currentTimeMillis());
+ ad.setSelectedEndpointUrl(endpointUrl);
AdapterDescription encryptedAdapterDescription =
new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
@@ -105,16 +105,12 @@ public class AdapterMasterManagement {
// start when stream adapter
if (ad instanceof AdapterStreamDescription) {
// TODO
- WorkerRestClient.invokeStreamAdapter(baseUrl, adapterId);
+ WorkerRestClient.invokeStreamAdapter(endpointUrl, adapterId);
LOG.info("Start adapter");
}
- // backend url is used to install data source in streampipes
- String backendBaseUrl = BackendConfig.INSTANCE.getBackendApiUrl() + "api/v2/";
- String requestUrl = backendBaseUrl + "noauth/users/" + username + "/element";
-
- LOG.info("Install source (source URL: " + newId + " in backend over URL: " + requestUrl);
- SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(newId);
+ LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+ SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
installDataSource(storedDescription, username);
return storedDescription.getElementId();
@@ -124,7 +120,7 @@ public class AdapterMasterManagement {
try {
new DataStreamVerifier(stream).verifyAndAdd(username, true, true);
} catch (SepaParseException e) {
- LOG.error("Error while installing data source: " + stream.getElementId(), e);
+ LOG.error("Error while installing data source: {}", stream.getElementId(), e);
throw new AdapterException();
}
}
@@ -143,14 +139,14 @@ public class AdapterMasterManagement {
throw new AdapterException("Could not find adapter with id: " + id);
}
- public void deleteAdapter(String id, String baseUrl) throws AdapterException {
+ public void deleteAdapter(String id) throws AdapterException {
// // IF Stream adapter delete it
boolean isStreamAdapter = isStreamAdapter(id);
+ AdapterDescription ad = adapterStorage.getAdapter(id);
if (isStreamAdapter) {
- stopStreamAdapter(id, baseUrl);
+ stopStreamAdapter(id, ad.getSelectedEndpointUrl());
}
- AdapterDescription ad = adapterStorage.getAdapter(id);
String username = ad.getUserName();
adapterStorage.deleteAdapter(id);
@@ -190,16 +186,16 @@ public class AdapterMasterManagement {
throw new AdapterException("Adapter " + adapterId + "is not a stream adapter.");
} else {
WorkerRestClient.stopStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
-
}
}
public void startStreamAdapter(String adapterId, String baseUrl) throws AdapterException {
AdapterDescription ad = adapterStorage.getAdapter(adapterId);
-
if (!isStreamAdapter(adapterId)) {
throw new AdapterException("Adapter " + adapterId + "is not a stream adapter.");
} else {
+ ad.setSelectedEndpointUrl(baseUrl);
+ adapterStorage.updateAdapter(ad);
WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
}
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index 18a21a8..b2e60a0 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -87,27 +87,16 @@ public class DescriptionManagement {
.findFirst();
}
- public String getAdapterAssets(AdapterDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getAdapterAssets(baseUrl, desc);
+ public String getAssets(String baseUrl) throws AdapterException {
+ return WorkerRestClient.getAssets(baseUrl);
}
- public byte[] getAdapterIconAsset(AdapterDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getAdapterIconAsset(baseUrl, desc);
+ public byte[] getIconAsset(String baseUrl) throws AdapterException {
+ return WorkerRestClient.getIconAsset(baseUrl);
}
- public String getAdapterDocumentationAsset(AdapterDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getAdapterDocumentationAsset(baseUrl, desc);
+ public String getDocumentationAsset(String baseUrl) throws AdapterException {
+ return WorkerRestClient.getDocumentationAsset(baseUrl);
}
- public String getProtocolAssets(ProtocolDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getProtocolAssets(baseUrl, desc);
- }
-
- public byte[] getProtocolIconAsset(ProtocolDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getProtocolIconAsset(baseUrl, desc);
- }
-
- public String getProtocolDocumentationAsset(ProtocolDescription desc, String baseUrl) throws AdapterException {
- return WorkerRestClient.getProtocolDocumentationAsset(baseUrl, desc);
- }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index e825369..0d867bb 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -25,9 +25,11 @@ import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.message.ErrorMessage;
@@ -40,19 +42,19 @@ import java.io.IOException;
public class GuessManagement {
private static Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
+ private WorkerUrlProvider workerUrlProvider;
public GuessManagement() {
-
+ this.workerUrlProvider = new WorkerUrlProvider();
}
public GuessSchema guessSchema(AdapterDescription adapterDescription) throws AdapterException, ParseException, WorkerAdapterException {
- String workerUrl = new Utils().getWorkerUrl(adapterDescription);
-
- workerUrl = workerUrl + "api/v1/admin@streampipes.de/worker/guess/schema";
+ try {
+ String workerUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
- ObjectMapper mapper = JacksonSerializer.getObjectMapper();
+ workerUrl = workerUrl + WorkerPaths.getGuessSchemaPath();
- try {
+ ObjectMapper mapper = JacksonSerializer.getObjectMapper();
String ad = mapper.writeValueAsString(adapterDescription);
LOG.info("Guess schema at: " + workerUrl);
Response requestResponse = Request.Post(workerUrl)
@@ -65,8 +67,7 @@ public class GuessManagement {
String responseString = EntityUtils.toString(httpResponse.getEntity());
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- GuessSchema guessSchema = mapper.readValue(responseString, GuessSchema.class);
- return guessSchema;
+ return mapper.readValue(responseString, GuessSchema.class);
} else {
ErrorMessage errorMessage = mapper.readValue(responseString, ErrorMessage.class);
@@ -74,9 +75,9 @@ public class GuessManagement {
throw new WorkerAdapterException(errorMessage);
}
- } catch (IOException e) {
+ } catch (IOException | NoServiceEndpointsAvailableException e) {
LOG.error(e.getMessage());
- throw new AdapterException("Error in connect worker: " + workerUrl, e);
+ throw new AdapterException("Error in connect worker: ", e);
}
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 94f9148..bba0366 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -18,9 +18,11 @@
package org.apache.streampipes.connect.container.master.management;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.Utils;
import org.apache.streampipes.container.html.JSONGenerator;
import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
import org.apache.streampipes.container.html.model.Description;
@@ -47,6 +49,7 @@ public class SourcesManagement {
private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
private AdapterStorageImpl adapterStorage;
+ private WorkerUrlProvider workerUrlProvider;
private String connectHost = null;
public SourcesManagement(AdapterStorageImpl adapterStorage) {
@@ -54,10 +57,11 @@ public class SourcesManagement {
}
public SourcesManagement() {
- this.adapterStorage = new AdapterStorageImpl();
+ this(new AdapterStorageImpl());
+ this.workerUrlProvider = new WorkerUrlProvider();
}
- public void addAdapter(String streamId, SpDataSet dataSet, String username) throws AdapterException {
+ public void addAdapter(String streamId, SpDataSet dataSet, String username) throws AdapterException, NoServiceEndpointsAvailableException {
String newUrl = getAdapterUrl(streamId, username);
@@ -74,7 +78,7 @@ public class SourcesManagement {
WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
}
- public void detachAdapter(String streamId, String runningInstanceId, String username) throws AdapterException {
+ public void detachAdapter(String streamId, String runningInstanceId, String username) throws AdapterException, NoServiceEndpointsAvailableException {
AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
String newId = adapterDescription.getUri() + "/streams/" + runningInstanceId;
@@ -85,7 +89,7 @@ public class SourcesManagement {
WorkerRestClient.stopSetAdapter(newUrl, adapterDescription);
}
- private String getAdapterUrl(String streamId, String username) {
+ private String getAdapterUrl(String streamId, String username) throws NoServiceEndpointsAvailableException {
String appId = "";
List<AdapterDescription> adapterDescriptions = this.adapterStorage.getAllAdapters();
for (AdapterDescription ad : adapterDescriptions) {
@@ -93,7 +97,7 @@ public class SourcesManagement {
appId = ad.getAppId();
}
}
- String workerUrl = new Utils().getWorkerUrlById(appId);
+ String workerUrl = workerUrlProvider.getWorkerBaseUrl(appId);
return Utils.addUserNameToApi(workerUrl, username);
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java
deleted file mode 100644
index bd53be0..0000000
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.container.master.management;
-
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-
-public class Utils {
- private WorkerAdministrationManagement workerAdministrationManagement;
-
- public Utils() {
- this.workerAdministrationManagement = new WorkerAdministrationManagement();
- }
-
- public static String addUserNameToApi(String url, String userName) {
- return url + "api/v1/" + userName + "/";
- }
-
- public String getWorkerUrl(AdapterDescription adapterDescription) {
- String id = "";
-
- if (adapterDescription instanceof GenericAdapterDescription) {
- id = ((GenericAdapterDescription) (adapterDescription)).getProtocolDescription().getAppId();
- } else {
- id = adapterDescription.getAppId();
- }
-
- return this.workerAdministrationManagement.getWorkerUrl(id);
- }
-
- public String getWorkerUrl(ProtocolDescription protocolDescription) {
- String id = protocolDescription.getAppId();
-
- return this.workerAdministrationManagement.getWorkerUrl(id);
- }
-
-
- public String getWorkerUrlById(String id) {
- return this.workerAdministrationManagement.getWorkerUrl(id);
- }
-}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index ed3e99a..74d627e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -18,13 +18,14 @@
package org.apache.streampipes.connect.container.master.management;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
@@ -48,17 +49,17 @@ public class WorkerAdministrationManagement {
boolean alreadyRegistered = false;
for (ConnectWorkerContainer c : allConnectWorkerContainers) {
- if (c.getEndpointUrl().equals(connectWorker.getEndpointUrl())) {
+ if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
boolean adaptersChanged = false;
for (AdapterDescription a : c.getAdapters()) {
- if (!connectWorker.getAdapters().stream().anyMatch(ad -> ad.getAdapterId().equals(a.getAdapterId()))) {
+ if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getAdapterId().equals(a.getAdapterId()))) {
adaptersChanged = true;
}
}
for (ProtocolDescription p : c.getProtocols()) {
- if (!connectWorker.getProtocols().stream().anyMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
+ if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
adaptersChanged = true;
}
}
@@ -66,40 +67,29 @@ public class WorkerAdministrationManagement {
if (!adaptersChanged) {
alreadyRegistered = true;
} else {
- LOG.info("Remove old connect worker: " + connectWorker.getEndpointUrl());
+ LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
}
}
-
}
// IF NOT REGISTERED
// Store Connect Worker in DB
if (!alreadyRegistered) {
this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
- LOG.info("Stored new connect worker: " + connectWorker.getEndpointUrl() + " in database");
+ LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
} else {
try {
this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
} catch (AdapterException e) {
- LOG.error("Could not start adapters on worker: " + connectWorker.getEndpointUrl());
+ LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
+ } catch (NoServiceEndpointsAvailableException e) {
+ LOG.error("Could not start adapter due to missing endpoint");
}
}
}
- public String getWorkerUrl(String id) {
- String workerUrl = "";
- List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
- for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
- if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(id))) {
- workerUrl = connectWorkerContainer.getEndpointUrl();
- } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(id))) {
- workerUrl = connectWorkerContainer.getEndpointUrl();
- }
- }
- return workerUrl;
- }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index fb91f5f..cb37eb2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -22,10 +22,10 @@ import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.model.util.Cloner;
@@ -47,37 +47,36 @@ public class WorkerRestClient {
private static final Logger logger = LoggerFactory.getLogger(WorkerRestClient.class);
- public static void invokeStreamAdapter(String baseUrl, String adapterId) throws AdapterException {
- invokeStreamAdapter(baseUrl, (AdapterStreamDescription) getAndDecryptAdapter(adapterId));
+ public static void invokeStreamAdapter(String endpointUrl, String adapterId) throws AdapterException {
+ invokeStreamAdapter(endpointUrl, (AdapterStreamDescription) getAndDecryptAdapter(adapterId));
}
- public static void invokeStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
-
- String url = baseUrl + "worker/stream/invoke";
+ public static void invokeStreamAdapter(String endpointUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+ String url = endpointUrl + WorkerPaths.getStreamInvokePath();
startAdapter(url, adapterStreamDescription);
updateStreamAdapterStatus(adapterStreamDescription.getId(), true);
}
public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
- String url = baseUrl + "worker/stream/stop";
+ String url = baseUrl + WorkerPaths.getStreamStopPath();
AdapterDescription ad = getAdapterDescriptionById(new AdapterStorageImpl(), adapterStreamDescription.getUri());
- stopAdapter(adapterStreamDescription.getId(), ad, url);
+ stopAdapter(ad, url);
updateStreamAdapterStatus(adapterStreamDescription.getId(), false);
}
public static void invokeSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
- String url = baseUrl + "worker/set/invoke";
+ String url = baseUrl + WorkerPaths.getSetInvokePath();
startAdapter(url, adapterSetDescription);
}
public static void stopSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
- String url = baseUrl + "worker/set/stop";
+ String url = baseUrl + WorkerPaths.getSetStopPath();
- stopAdapter(adapterSetDescription.getUri(), adapterSetDescription, url);
+ stopAdapter(adapterSetDescription, url);
}
public static void startAdapter(String url, AdapterDescription ad) throws AdapterException {
@@ -106,11 +105,12 @@ public class WorkerRestClient {
}
- public static void stopAdapter(String adapterId, AdapterDescription ad, String url) throws AdapterException {
+ public static void stopAdapter(AdapterDescription ad,
+ String url) throws AdapterException {
// Stop execution of adatper
try {
- logger.info("Trying to stopAdapter adpater on endpoint: " + url);
+ logger.info("Trying to stopAdapter adapter on endpoint: " + url);
String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
@@ -130,9 +130,10 @@ public class WorkerRestClient {
}
- public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, String elementId, String username, RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException {
- String element = encodeValue(elementId);
- String url = workerEndpoint + "api/v1/" + username + "/worker/resolvable/" + element + "/configurations";
+ public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
+ String appId,
+ RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException {
+ String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);
try {
String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
@@ -148,20 +149,11 @@ public class WorkerRestClient {
e.printStackTrace();
throw new AdapterException("Could not resolve runtime configurations from " + url);
}
-
- }
-
- public static String getAdapterAssets(String baseUrl, AdapterDescription ad) throws AdapterException {
- return getAssets(baseUrl + "worker/adapters", ad.getAppId());
}
- public static String getProtocolAssets(String baseUrl, ProtocolDescription ad) throws AdapterException {
- return getAssets(baseUrl + "worker/protocol", ad.getAppId());
- }
-
- private static String getAssets(String baseUrl, String appId) throws AdapterException {
- String url = baseUrl + "/" + appId + "/assets";
- logger.info("Trying to Assets from endpoint: " + url + " for adapter: " + appId);
+ public static String getAssets(String workerPath) throws AdapterException {
+ String url = workerPath + "/assets";
+ logger.info("Trying to Assets from endpoint: " + url);
try {
return Request.Get(url)
@@ -170,22 +162,13 @@ public class WorkerRestClient {
.execute().returnContent().asString();
} catch (IOException e) {
logger.error(e.getMessage());
- throw new AdapterException("Could not get assets endpoint: " + url + " for adapter: " + appId);
+ throw new AdapterException("Could not get assets endpoint: " + url);
}
}
- public static byte[] getAdapterIconAsset(String baseUrl, AdapterDescription ad) throws AdapterException {
- return getIconAsset(baseUrl + "worker/adapters", ad.getAppId());
- }
-
- public static byte[] getProtocolIconAsset(String baseUrl, ProtocolDescription ad) throws AdapterException {
- return getIconAsset(baseUrl + "worker/protocols", ad.getAppId());
-
- }
-
- private static byte[] getIconAsset(String baseUrl, String appId) throws AdapterException {
- String url = baseUrl + "/" + appId + "/assets/icon";
+ public static byte[] getIconAsset(String baseUrl) throws AdapterException {
+ String url = baseUrl + "/assets/icon";
try {
byte[] responseString = Request.Get(url)
@@ -199,16 +182,8 @@ public class WorkerRestClient {
}
}
- public static String getAdapterDocumentationAsset(String baseUrl, AdapterDescription ad) throws AdapterException {
- return getDocumentationAsset(baseUrl + "worker/adapters", ad.getAppId());
- }
-
- public static String getProtocolDocumentationAsset(String baseUrl, ProtocolDescription ad) throws AdapterException {
- return getDocumentationAsset(baseUrl + "worker/protocols", ad.getAppId());
- }
-
- private static String getDocumentationAsset(String baseUrl, String appId) throws AdapterException {
- String url = baseUrl + "/" + appId + "/assets/documentation";
+ public static String getDocumentationAsset(String baseUrl) throws AdapterException {
+ String url = baseUrl + "/assets/documentation";
try {
return Request.Get(url)
@@ -217,7 +192,7 @@ public class WorkerRestClient {
.execute().returnContent().asString();
} catch (IOException e) {
logger.error(e.getMessage());
- throw new AdapterException("Could not get documentation endpoint: " + url + " for adapter: " + appId);
+ throw new AdapterException("Could not get documentation endpoint: " + url);
}
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
new file mode 100644
index 0000000..efc5b4c
--- /dev/null
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.management;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.http.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.storage.api.IConnectWorkerContainerStorage;
+import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.util.List;
+
+public class WorkerUrlProvider {
+
+ private IConnectWorkerContainerStorage connectWorkerContainerStorage;
+
+ public WorkerUrlProvider() {
+ this.connectWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
+ }
+
+ public String getWorkerUrlForAdapter(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
+ String id = "";
+
+ if (adapterDescription instanceof GenericAdapterDescription) {
+ id = ((GenericAdapterDescription) (adapterDescription)).getProtocolDescription().getAppId();
+ } else {
+ id = adapterDescription.getAppId();
+ }
+
+ return getWorkerUrl(id);
+ }
+
+ public String getWorkerUrlForProtocol(ProtocolDescription protocolDescription) throws NoServiceEndpointsAvailableException {
+ String id = protocolDescription.getAppId();
+
+ return getWorkerUrl(id);
+ }
+
+ public String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
+ return getEndpointGenerator(appId).getEndpointResourceUrl();
+ }
+
+ public String getWorkerBaseUrl(String appId) throws NoServiceEndpointsAvailableException {
+ return getEndpointGenerator(appId).getEndpointBaseUrl();
+ }
+
+ private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) {
+ SpServiceUrlProvider provider = null;
+ List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectWorkerContainerStorage.getAllConnectWorkerContainers();
+
+ for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
+ if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(appId))) {
+ provider = SpServiceUrlProvider.PROTOCOL;
+ } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(appId))) {
+ provider = SpServiceUrlProvider.ADAPTER;
+ }
+ }
+
+ return new ExtensionsServiceEndpointGenerator(appId, provider);
+ }
+
+}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
index 8a9887d..b887d98 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
@@ -18,9 +18,10 @@
package org.apache.streampipes.connect.container.master.rest;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
-import org.apache.streampipes.connect.container.master.management.Utils;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
import org.apache.streampipes.model.message.Notifications;
@@ -37,9 +38,11 @@ import java.util.List;
public class AdapterResource extends AbstractAdapterResource<AdapterMasterManagement> {
private Logger LOG = LoggerFactory.getLogger(AdapterResource.class);
+ private WorkerUrlProvider workerUrlProvider;
public AdapterResource() {
super(AdapterMasterManagement::new);
+ this.workerUrlProvider = new WorkerUrlProvider();
}
@POST
@@ -52,9 +55,9 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
LOG.info("User: " + username + " starts adapter " + adapterDescription.getAdapterId());
try {
- String workerUrl = getModifiedWorkerUrl(adapterDescription, username);
- adapterId = managementService.addAdapter(adapterDescription, workerUrl, username);
- } catch (AdapterException e) {
+ String workerBaseUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
+ adapterId = managementService.addAdapter(adapterDescription, workerBaseUrl, username);
+ } catch (AdapterException | NoServiceEndpointsAvailableException e) {
LOG.error("Error while starting adapter with id " + adapterDescription.getAppId(), e);
return ok(Notifications.error(e.getMessage()));
}
@@ -86,8 +89,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
public Response stopAdapter(@PathParam("id") String adapterId,
@PathParam("username") String username) {
try {
- String workerUrl = getModifiedWorkerUrl(adapterId, username);
- managementService.stopStreamAdapter(adapterId, workerUrl);
+ managementService.stopStreamAdapter(adapterId, getAdapterDescription(adapterId).getSelectedEndpointUrl());
return ok(Notifications.success("Adapter started"));
} catch (AdapterException e) {
LOG.error("Could not stop adapter with id " +adapterId, e);
@@ -102,10 +104,10 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
public Response startAdapter(@PathParam("id") String adapterId,
@PathParam("username") String username) {
try {
- String workerUrl = getModifiedWorkerUrl(adapterId, username);
+ String workerUrl = workerUrlProvider.getWorkerBaseUrl(getAdapterDescription(adapterId).getAppId());
managementService.startStreamAdapter(adapterId, workerUrl);
return ok(Notifications.success("Adapter stopped"));
- } catch (AdapterException e) {
+ } catch (AdapterException | NoServiceEndpointsAvailableException e) {
LOG.error("Could not start adapter with id " +adapterId, e);
return ok(Notifications.error(e.getMessage()));
}
@@ -119,8 +121,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
@PathParam("username") String username) {
try {
- String workerUrl = getModifiedWorkerUrl(adapterId, username);
- managementService.deleteAdapter(adapterId, workerUrl);
+ managementService.deleteAdapter(adapterId);
return ok(Notifications.success("Adapter deleted."));
} catch (AdapterException e) {
LOG.error("Error while deleting adapter with id " + adapterId, e);
@@ -148,15 +149,4 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
return managementService.getAdapter(adapterId);
}
- private String getModifiedWorkerUrl(String adapterId,
- String username) throws AdapterException {
- AdapterDescription adapterDescription = getAdapterDescription(adapterId);
- return getModifiedWorkerUrl(adapterDescription, username);
- }
-
- private String getModifiedWorkerUrl(AdapterDescription adapterDescription,
- String username) throws AdapterException {
- String workerUrl = new Utils().getWorkerUrlById(adapterDescription.getAppId());
- return Utils.addUserNameToApi(workerUrl, username);
- }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
index 7132e09..a247193 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
@@ -17,9 +17,10 @@
package org.apache.streampipes.connect.container.master.rest;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
-import org.apache.streampipes.connect.container.master.management.Utils;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
@@ -41,9 +42,11 @@ import java.util.Optional;
public class DescriptionResource extends AbstractAdapterResource<DescriptionManagement> {
private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class);
+ private WorkerUrlProvider workerUrlProvider;
public DescriptionResource() {
super(DescriptionManagement::new);
+ workerUrlProvider = new WorkerUrlProvider();
}
@GET
@@ -86,19 +89,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
if (adapterDescriptionOptional.isPresent()) {
AdapterDescription adapterDescription = adapterDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(adapterDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
- result = managementService.getAdapterAssets(adapterDescription, newUrl);
+ result = managementService.getAssets(workerUrl);
}
Optional<ProtocolDescription> protocolDescriptionOptional = managementService.getProtocol(id);
if (protocolDescriptionOptional.isPresent()) {
ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(protocolDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
- result = managementService.getProtocolAssets(protocolDescription, newUrl);
+ result = managementService.getAssets(workerUrl);
}
if (result == null) {
@@ -110,6 +111,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
} catch (AdapterException e) {
LOG.error("Not found adapter with id " + id, e);
return fail();
+ } catch (NoServiceEndpointsAvailableException e) {
+ return fail();
}
}
@@ -124,19 +127,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
if (adapterDescriptionOptional.isPresent()) {
AdapterDescription adapterDescription = adapterDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(adapterDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
- result = managementService.getAdapterIconAsset(adapterDescription, newUrl);
+ result = managementService.getIconAsset(workerUrl);
}
Optional<ProtocolDescription> protocolDescriptionOptional = managementService.getProtocol(id);
if (protocolDescriptionOptional.isPresent()) {
ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(protocolDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
- result = managementService.getProtocolIconAsset(protocolDescription, newUrl);
+ result = managementService.getIconAsset(workerUrl);
}
if (result == null) {
@@ -148,6 +149,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
} catch (AdapterException e) {
LOG.error("Not found adapter with id " + id);
return fail();
+ } catch (NoServiceEndpointsAvailableException e) {
+ return fail();
}
}
@@ -161,19 +164,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
if (adapterDescriptionOptional.isPresent()) {
AdapterDescription adapterDescription = adapterDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(adapterDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
- result = managementService.getAdapterDocumentationAsset(adapterDescription, newUrl);
+ result = managementService.getDocumentationAsset(workerUrl);
}
Optional<ProtocolDescription> protocolDescriptionOptional = managementService.getProtocol(id);
if (protocolDescriptionOptional.isPresent()) {
ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
- String workerUrl = new Utils().getWorkerUrl(protocolDescription);
- String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+ String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
- result = managementService.getProtocolDocumentationAsset(protocolDescription, newUrl);
+ result = managementService.getDocumentationAsset(workerUrl);
}
if (result == null) {
@@ -185,6 +186,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
} catch (AdapterException e) {
LOG.error("Not found adapter with id " + id, e);
return fail();
+ } catch (NoServiceEndpointsAvailableException e) {
+ return fail();
}
}
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
index df640df..0294dcb 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
@@ -18,9 +18,11 @@
package org.apache.streampipes.connect.container.master.rest;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.WorkerAdministrationManagement;
import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -33,9 +35,11 @@ import javax.ws.rs.core.Response;
public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdministrationManagement> {
private static final String SP_NS = "https://streampipes.org/vocabulary/v1/";
+ private WorkerUrlProvider workerUrlProvider;
public RuntimeResolvableResource() {
super(WorkerAdministrationManagement::new);
+ this.workerUrlProvider = new WorkerUrlProvider();
}
@POST
@@ -43,22 +47,18 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdm
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
- public Response fetchConfigurations(@PathParam("id") String elementId,
+ public Response fetchConfigurations(@PathParam("id") String appId,
@PathParam("username") String username,
RuntimeOptionsRequest runtimeOptionsRequest) {
// TODO add solution for formats
-// ResolvesContainerProvidedOptions runtimeResolvableOptions = RuntimeResovable.getRuntimeResolvableFormat(elementId);
-
- String id = elementId.replaceAll("sp:", SP_NS);
- String workerEndpoint = managementService.getWorkerUrl(id);
try {
-
- RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, elementId, username, runtimeOptionsRequest);
+ String workerEndpoint = workerUrlProvider.getWorkerBaseUrl(appId);
+ RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest);
return ok(result);
- } catch (AdapterException e) {
+ } catch (AdapterException | NoServiceEndpointsAvailableException e) {
e.printStackTrace();
return fail();
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
index 9cfb19b..7d5e678 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.container.master.rest;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.SourcesManagement;
import org.apache.streampipes.model.SpDataSet;
@@ -79,7 +80,7 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
try {
managementService.addAdapter(elementId, dataSet, username);
- } catch (AdapterException e) {
+ } catch (AdapterException | NoServiceEndpointsAvailableException e) {
LOG.error("Could not set data set instance: " + dataSet.getUri(), e);
return ok(Notifications.error("Could not set data set instance: " + dataSet.getUri()));
}
@@ -98,7 +99,7 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
try {
managementService.detachAdapter(elementId, runningInstanceId, username);
- } catch (AdapterException e) {
+ } catch (AdapterException | NoServiceEndpointsAvailableException e) {
LOG.error("Could not set set id "+ elementId + " with instance id: "+ runningInstanceId, e);
return fail();
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
index d421f4c..b601d60 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
@@ -47,7 +47,7 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
public Response addWorkerContainer(ConnectWorkerContainer connectWorkerContainer) {
- LOG.info("Worker container: " + connectWorkerContainer.getEndpointUrl() + " was detected");
+ LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
this.workerAdministrationManagement.register(connectWorkerContainer);
return ok(Notifications.success("Worker Container sucessfully added"));
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
similarity index 72%
copy from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
copy to streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
index 861bbe3..846944f 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
@@ -16,17 +16,14 @@
*
*/
-package org.apache.streampipes.storage.api;
+package org.apache.streampipes.connect.container.master.util;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+public class Utils {
-import java.util.List;
+ public static String addUserNameToApi(String url, String userName) {
+ //return url;
+ return url + "/api/v1/" + userName + "/";
+ }
-public interface IRdfEndpointStorage {
- void addRdfEndpoint(RdfEndpoint rdfEndpoint);
-
- void removeRdfEndpoint(String rdfEndpointId);
-
- List<RdfEndpoint> getRdfEndpoints();
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
new file mode 100644
index 0000000..2b5a624
--- /dev/null
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.util;
+
+public class WorkerPaths {
+
+ private static final String WorkerMainPath = "/api/v1/worker";
+ private static final String Slash = "/";
+
+ public static String getStreamInvokePath() {
+ return WorkerMainPath + "/stream/invoke";
+ }
+
+ public static String getStreamStopPath() {
+ return WorkerMainPath + "/stream/stop";
+ }
+
+ public static String getSetInvokePath() {
+ return WorkerMainPath + "/set/invoke";
+ }
+
+ public static String getSetStopPath() {
+ return WorkerMainPath + "/set/stop";
+ }
+
+ public static String getRuntimeResolvablePath(String elementId) {
+ return WorkerMainPath + "/resolvable/" + elementId + "/configurations";
+ }
+
+ public static String getAdaptersPath() {
+ return WorkerMainPath + "/adapters";
+ }
+
+ public static String getProtocolsPath() {
+ return WorkerMainPath + "/protocols";
+ }
+
+ public static String getAdaptersPath(String appId) {
+ return getAdaptersPath() + Slash + appId;
+ }
+
+ public static String getProtocolsPath(String appId) {
+ return getProtocolsPath() + Slash + appId;
+ }
+
+ public static String getGuessSchemaPath() {
+ return WorkerMainPath + "/guess/schema";
+ }
+
+
+
+
+}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
index ae186f3..eae5336 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
@@ -59,7 +59,7 @@ public class WorkerRestClientTest {
WorkerRestClient.stopStreamAdapter("", description);
verifyStatic(WorkerRestClient.class, times(1));
- WorkerRestClient.stopAdapter(anyString(), any(), eq("worker/stream/stop"));
+ WorkerRestClient.stopAdapter(any(), eq("worker/stream/stop"));
}
@@ -109,7 +109,7 @@ public class WorkerRestClientTest {
WorkerRestClient.stopSetAdapter("", description);
verifyStatic(WorkerRestClient.class, times(1));
- WorkerRestClient.stopAdapter(anyString(), any(), eq("worker/set/stop"));
+ WorkerRestClient.stopAdapter(any(), eq("worker/set/stop"));
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
index 9b69ebd..2db077a 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
@@ -28,7 +28,7 @@ public class AdapterServiceResourceProvider implements ExtensionsResourceProvide
@Override
public List<Class<?>> getResourceClasses() {
- return Arrays.asList(//WelcomePageWorker.class,
+ return Arrays.asList(WelcomePageWorker.class,
GuessResource.class,
RuntimeResolvableResource.class,
WorkerResource.class,
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index b6b3f6a..4f434f8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -49,11 +49,16 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
@Override
public void afterServiceRegistered(SpServiceDefinition serviceDef) {
- new ConnectWorkerRegistrationService().registerWorker();
+ new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
}
@Override
public void onExit() {
deregisterService(DeclarersSingleton.getInstance().getServiceId());
}
+
+ @Override
+ protected String getHealthCheckPath() {
+ return "/worker";
+ }
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index ac98daf..1ec7506 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.locales.LabelGenerator;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
import org.slf4j.Logger;
@@ -37,32 +36,24 @@ public class ConnectWorkerDescriptionProvider {
private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerDescriptionProvider.class);
- public ConnectWorkerContainer getContainerDescription(String endpointUrl) {
+ public ConnectWorkerContainer getContainerDescription(String serviceGroup) {
List<AdapterDescription> adapters = new ArrayList<>();
for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
- AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
+ AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel());
adapters.add(desc);
}
List<ProtocolDescription> protocols = new ArrayList<>();
for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
- ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
+ ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel());
protocols.add(desc);
}
- return new ConnectWorkerContainer(endpointUrl, protocols, adapters);
+ return new ConnectWorkerContainer(serviceGroup, protocols, adapters);
}
- private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity, String endpointUrl) {
- if (!(entity instanceof GenericAdapterDescription)) {
- if (entity instanceof ProtocolDescription) {
- entity.setElementId(endpointUrl + "protocol/" + entity.getElementId());
- } else if (entity instanceof AdapterDescription) {
- entity.setElementId(endpointUrl + "adapter/" + entity.getElementId());
- }
- }
-
+ private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity) {
// TODO remove after full internationalization support has been implemented
if (entity.isIncludesLocales()) {
LabelGenerator lg = new LabelGenerator(entity);
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index ab89a05..e23d1ed 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.connect.container.worker.init;
import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
-import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,18 +28,14 @@ public class ConnectWorkerRegistrationService {
private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerRegistrationService.class);
- public void registerWorker() {
+ public void registerWorker(String serviceGroup) {
String masterUrl = getConnectMasterUrl() + "/streampipes-backend";
- String workerUrl = "http://"
- + DeclarersSingleton.getInstance().getHostName()
- + ":" + DeclarersSingleton.getInstance().getPort() + "/";
-
boolean connected = false;
while (!connected) {
LOG.info("Trying to connect to master: " + masterUrl);
connected = MasterRestClient.register(masterUrl,
- new ConnectWorkerDescriptionProvider().getContainerDescription(workerUrl));
+ new ConnectWorkerDescriptionProvider().getContainerDescription(serviceGroup));
if (!connected) {
LOG.info("Retrying in 5 seconds");
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
index d261c89..107cfaa 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.util.ServiceDefinitionUtil;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,7 +36,7 @@ public class ConnectWorkerTagProvider {
Collection<IProtocol> protocols = DeclarersSingleton.getInstance().getAllProtocols();
tags.addAll(ServiceDefinitionUtil.extractAppIdsFromAdapters(adapters));
tags.addAll(ServiceDefinitionUtil.extractAppIdsFromProtocols(protocols));
- tags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.CONNECT_WORKER));
+ tags.add(DefaultSpServiceTags.CONNECT_WORKER);
return tags;
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
index 724924d..0c0d4f9 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
@@ -34,7 +34,7 @@ import java.io.IOException;
import java.net.URL;
import java.util.List;
-@Path("/api/v1/{username}/worker/adapters")
+@Path("/api/v1/worker/adapters")
public class AdapterResource extends AbstractSharedRestInterface {
private AdapterWorkerManagement adapterWorkerManagement;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 8642e86..228dac8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -36,7 +36,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/api/v1/{username}/worker/guess")
+@Path("/api/v1/worker/guess")
public class GuessResource extends AbstractSharedRestInterface {
private static final Logger logger = LoggerFactory.getLogger(GuessResource.class);
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
index edcd7d8..7a4f1b3 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
@@ -24,7 +24,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
-@Path("/api/v1/{username}/worker/live")
+@Path("/api/v1/worker/live")
public class HttpServerAdapterResource {
@POST
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
index 5d8fade..5dbd005 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
@@ -34,7 +34,7 @@ import java.io.IOException;
import java.net.URL;
import java.util.List;
-@Path("/api/v1/{username}/worker/protocols")
+@Path("/api/v1/worker/protocols")
public class ProtocolResource extends AbstractSharedRestInterface {
private AdapterWorkerManagement adapterWorkerManagement;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
index 0f3d17f..dd8417f 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
@@ -32,7 +32,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
-@Path("/api/v1/{username}/worker/resolvable")
+@Path("/api/v1/worker/resolvable")
public class RuntimeResolvableResource extends AbstractSharedRestInterface {
@POST
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
index 7237360..c07ec73 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
@@ -32,7 +32,7 @@ import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collection;
-@Path("/")
+@Path("/worker")
public class WelcomePageWorker extends AbstractSharedRestInterface {
private String id;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
index 1397fd2..cbc0537 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
@@ -35,7 +35,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/api/v1/{username}/worker")
+@Path("/api/v1/worker")
public class WorkerResource extends AbstractSharedRestInterface {
private static final Logger logger = LoggerFactory.getLogger(WorkerResource.class);
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index e915775..6ebf29c 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.container.base;
import org.apache.streampipes.commons.networking.Networking;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,13 +54,17 @@ public abstract class StreamPipesServiceBase {
private void registerService(String serviceGroup,
String serviceId,
Integer defaultPort) throws UnknownHostException {
+ SpServiceRegistrationRequest req = SpServiceRegistrationRequest.from(
+ serviceGroup,
+ serviceId,
+ getHostname(),
+ getPort(defaultPort),
+ getServiceTags(),
+ getHealthCheckPath());
+
SpServiceDiscovery
.getServiceDiscovery()
- .registerService(serviceGroup,
- serviceId,
- getHostname(),
- getPort(defaultPort),
- getServiceTags());
+ .registerService(req);
}
protected String getHostname() throws UnknownHostException {
@@ -77,4 +82,8 @@ public abstract class StreamPipesServiceBase {
SpServiceDiscovery.getServiceDiscovery().deregisterService(serviceId);
}
+ protected String getHealthCheckPath() {
+ return "";
+ }
+
}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index ac9f572..82addbe 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -50,7 +50,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
@Override
public void afterServiceRegistered(SpServiceDefinition serviceDef) {
- new ConnectWorkerRegistrationService().registerWorker();
+ new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
}
@Override
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
index 44ff9ba..793e8ef 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.util.ServiceDefinitionUtil;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
import java.util.Collection;
import java.util.List;
@@ -32,7 +31,7 @@ public class PipelineElementServiceTagProvider {
public List<SpServiceTag> extractServiceTags() {
Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
List<SpServiceTag> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
- serviceTags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.PE));
+ serviceTags.add(DefaultSpServiceTags.PE);
return serviceTags;
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
index 34f64dd..3d33348 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.container.api;
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -31,7 +31,7 @@ import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import javax.ws.rs.Path;
import java.util.Map;
-@Path(PipelineElementPrefix.DATA_PROCESSOR)
+@Path(SpServicePathPrefix.DATA_PROCESSOR)
public class DataProcessorPipelineElementResource extends InvocablePipelineElementResource<DataProcessorInvocation,
SemanticEventProcessingAgentDeclarer, ProcessingElementParameterExtractor> {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
index ae46ae1..1303216 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.container.api;
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
import org.apache.streampipes.container.declarer.SemanticEventConsumerDeclarer;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -30,7 +30,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import javax.ws.rs.Path;
import java.util.Map;
-@Path(PipelineElementPrefix.DATA_SINK)
+@Path(SpServicePathPrefix.DATA_SINK)
public class DataSinkPipelineElementResource extends InvocablePipelineElementResource<DataSinkInvocation,
SemanticEventConsumerDeclarer, DataSinkParameterExtractor> {
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
index 063a823..23421e0 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.container.api;
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
import org.apache.streampipes.container.assets.AssetZipGenerator;
import org.apache.streampipes.container.declarer.DataSetDeclarer;
import org.apache.streampipes.container.declarer.DataStreamDeclarer;
@@ -33,7 +33,7 @@ import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Map;
-@Path(PipelineElementPrefix.DATA_STREAM)
+@Path(SpServicePathPrefix.DATA_STREAM)
public class DataStreamPipelineElementResource extends AbstractPipelineElementResource<DataStreamDeclarer> {
@Override
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
index ecbce27..140c1fc 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.sdk.utils.Assets;
import java.io.IOException;
import java.util.ArrayList;
@@ -72,8 +73,10 @@ public class WelcomePageGenerator {
desc.setElementId(entity.getElementId());
desc.setAppId(entity.getAppId());
desc.setEditable(!(entity.isInternallyManaged()));
- desc.setIncludesDocs(entity.isIncludesAssets());
- desc.setIncludesIcon(entity.isIncludesAssets());
+ desc.setIncludesDocs(entity.isIncludesAssets()
+ && entity.getIncludedAssets().contains(Assets.DOCUMENTATION));
+ desc.setIncludesIcon(entity.isIncludesAssets()
+ && entity.getIncludedAssets().contains(Assets.ICON));
String uri = baseUri;
if (declarer instanceof SemanticEventConsumerDeclarer) {
uri += "sec/";
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
similarity index 91%
rename from streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java
rename to streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
index 74c07e1..f164d80 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java
+++ b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
@@ -20,18 +20,18 @@ package org.apache.streampipes.model.client.endpoint;
import com.google.gson.annotations.SerializedName;
-public class RdfEndpoint {
+public class ExtensionsServiceEndpoint {
private @SerializedName("_id") String id;
private @SerializedName("_rev") String rev;
private String endpointUrl;
- public RdfEndpoint() {
+ public ExtensionsServiceEndpoint() {
}
- public RdfEndpoint(String endpointUrl) {
+ public ExtensionsServiceEndpoint(String endpointUrl) {
this.endpointUrl = endpointUrl;
}
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
similarity index 90%
rename from streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
rename to streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
index 54863ab..12dc8e9 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
+++ b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
import java.util.List;
@TsModel
-public class RdfEndpointItem {
+public class ExtensionsServiceEndpointItem {
private String name;
private String description;
@@ -38,9 +38,9 @@ public class RdfEndpointItem {
private boolean installed;
private boolean editable;
- private List<RdfEndpointItem> streams;
+ private List<ExtensionsServiceEndpointItem> streams;
- public RdfEndpointItem() {
+ public ExtensionsServiceEndpointItem() {
}
@@ -68,11 +68,11 @@ public class RdfEndpointItem {
this.uri = uri;
}
- public List<RdfEndpointItem> getStreams() {
+ public List<ExtensionsServiceEndpointItem> getStreams() {
return streams;
}
- public void setStreams(List<RdfEndpointItem> streams) {
+ public void setStreams(List<ExtensionsServiceEndpointItem> streams) {
this.streams = streams;
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index d434e68..a6c99a1 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -67,6 +67,8 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
private long createdAt;
+ private String selectedEndpointUrl;
+
public AdapterDescription() {
super();
this.rules = new ArrayList<>();
@@ -103,6 +105,7 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
this.icon = other.getIcon();
this.category = new Cloner().epaTypes(other.getCategory());
this.createdAt = other.getCreatedAt();
+ this.selectedEndpointUrl = other.getSelectedEndpointUrl();
if (other.getEventGrounding() != null) this.eventGrounding = new EventGrounding(other.getEventGrounding());
}
@@ -236,4 +239,12 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
public void setCreatedAt(long createdAt) {
this.createdAt = createdAt;
}
+
+ public String getSelectedEndpointUrl() {
+ return selectedEndpointUrl;
+ }
+
+ public void setSelectedEndpointUrl(String selectedEndpointUrl) {
+ this.selectedEndpointUrl = selectedEndpointUrl;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
index f41ad26..d6b2cff 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
@@ -22,6 +22,7 @@ import com.google.gson.annotations.SerializedName;
import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.util.ElementIdGenerator;
import java.util.ArrayList;
import java.util.List;
@@ -30,30 +31,34 @@ public class ConnectWorkerContainer extends UnnamedStreamPipesEntity {
private @SerializedName("_rev") String rev;
+ private String serviceGroup;
+
+ private List<ProtocolDescription> protocols;
+
+ private List<AdapterDescription> adapters;
+
public ConnectWorkerContainer() {
super();
this.adapters = new ArrayList<>();
this.protocols = new ArrayList<>();
}
- private String endpointUrl;
-
- private List<ProtocolDescription> protocols;
-
- private List<AdapterDescription> adapters;
-
- public ConnectWorkerContainer(String endpointUrl, List<ProtocolDescription> protocols, List<AdapterDescription> adapters) {
- this.endpointUrl = endpointUrl;
+ public ConnectWorkerContainer(String serviceGroup,
+ List<ProtocolDescription> protocols,
+ List<AdapterDescription> adapters) {
+ super();
+ this.elementId = ElementIdGenerator.makeElementIdFromAppId(serviceGroup);
+ this.serviceGroup = serviceGroup;
this.protocols = protocols;
this.adapters = adapters;
}
- public String getEndpointUrl() {
- return endpointUrl;
+ public String getServiceGroup() {
+ return serviceGroup;
}
- public void setEndpointUrl(String endpointUrl) {
- this.endpointUrl = endpointUrl;
+ public void setServiceGroup(String serviceGroup) {
+ this.serviceGroup = serviceGroup;
}
public List<ProtocolDescription> getProtocols() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
index 8aa2e21..dca69f2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
@@ -21,14 +21,13 @@ package org.apache.streampipes.model.dashboard;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.google.gson.annotations.SerializedName;
-import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
@JsonSubTypes({
@JsonSubTypes.Type(DashboardWidgetModel.class),
@JsonSubTypes.Type(DataExplorerWidgetModel.class)
})
-public abstract class DashboardEntity extends UnnamedStreamPipesEntity {
+public abstract class DashboardEntity {
@JsonProperty("_id")
private @SerializedName("_id") String id;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
index 06d0891..e0b50b5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
@@ -18,8 +18,9 @@
package org.apache.streampipes.manager.assets;
import org.apache.http.client.fluent.Request;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
-import org.apache.streampipes.manager.execution.http.PipelineElementEndpointGenerator;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+import org.apache.streampipes.manager.execution.http.ExtensionsServiceEndpointGenerator;
import java.io.IOException;
import java.io.InputStream;
@@ -28,17 +29,17 @@ public class AssetFetcher {
private static final String ASSET_ENDPOINT_APPENDIX = "/assets";
- private PipelineElementUrl pipelineElementUrl;
+ private SpServiceUrlProvider spServiceUrlProvider;
private String appId;
- public AssetFetcher(PipelineElementUrl pipelineElementUrl,
+ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider,
String appId) {
- this.pipelineElementUrl = pipelineElementUrl;
+ this.spServiceUrlProvider = spServiceUrlProvider;
this.appId = appId;
}
- public InputStream fetchPipelineElementAssets() throws IOException {
- String endpointUrl = new PipelineElementEndpointGenerator(appId, pipelineElementUrl).getEndpointResourceUrl();
+ public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException {
+ String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl();
return Request
.Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
.execute()
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
index 19096c7..8fd172b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
@@ -19,7 +19,8 @@ package org.apache.streampipes.manager.assets;
import org.apache.commons.io.FileUtils;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import java.io.File;
import java.io.IOException;
@@ -42,9 +43,9 @@ public class AssetManager {
return Files.readAllBytes(Paths.get(getAssetPath(appId, assetName)));
}
- public static void storeAsset(PipelineElementUrl pipelineElementUrl,
- String appId) throws IOException {
- InputStream assetStream = new AssetFetcher(pipelineElementUrl, appId)
+ public static void storeAsset(SpServiceUrlProvider spServiceUrlProvider,
+ String appId) throws IOException, NoServiceEndpointsAvailableException {
+ InputStream assetStream = new AssetFetcher(spServiceUrlProvider, appId)
.fetchPipelineElementAssets();
new AssetExtractor(assetStream, appId).extractAssetContents();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index a518056..1aa54c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.manager.endpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
@@ -29,21 +29,21 @@ import java.util.stream.Stream;
public class EndpointFetcher {
- public List<RdfEndpoint> getEndpoints() {
- List<String> endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePeEndpoints();
- List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
+ public List<ExtensionsServiceEndpoint> getEndpoints() {
+ List<String> endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePipelineElementEndpoints();
+ List<ExtensionsServiceEndpoint> servicerdExtensionsServiceEndpoints = new LinkedList<>();
for (String endpoint : endpoints) {
- RdfEndpoint rdfEndpoint =
- new RdfEndpoint(endpoint);
- servicerdRdfEndpoints.add(rdfEndpoint);
+ ExtensionsServiceEndpoint extensionsServiceEndpoint =
+ new ExtensionsServiceEndpoint(endpoint);
+ servicerdExtensionsServiceEndpoints.add(extensionsServiceEndpoint);
}
- List<RdfEndpoint> databasedRdfEndpoints = StorageDispatcher.INSTANCE.getNoSqlStore()
+ List<ExtensionsServiceEndpoint> databasedExtensionsServiceEndpoints = StorageDispatcher.INSTANCE.getNoSqlStore()
.getRdfEndpointStorage()
- .getRdfEndpoints();
+ .getExtensionsServiceEndpoints();
- List<RdfEndpoint> concatList =
- Stream.of(databasedRdfEndpoints, servicerdRdfEndpoints)
+ List<ExtensionsServiceEndpoint> concatList =
+ Stream.of(databasedExtensionsServiceEndpoints, servicerdExtensionsServiceEndpoints)
.flatMap(Collection::stream)
.collect(Collectors.toList());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
index 9986b15..a6745d2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.manager.endpoint;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.http.client.fluent.Request;
import org.apache.http.message.BasicHeader;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,19 +35,19 @@ import java.util.List;
public class EndpointItemFetcher {
Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class);
- private List<RdfEndpoint> rdfEndpoints;
+ private List<ExtensionsServiceEndpoint> extensionsServiceEndpoints;
- public EndpointItemFetcher(List<RdfEndpoint> rdfEndpoints) {
- this.rdfEndpoints = rdfEndpoints;
+ public EndpointItemFetcher(List<ExtensionsServiceEndpoint> extensionsServiceEndpoints) {
+ this.extensionsServiceEndpoints = extensionsServiceEndpoints;
}
- public List<RdfEndpointItem> getItems() {
- List<RdfEndpointItem> endpointItems = new ArrayList<>();
- rdfEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
+ public List<ExtensionsServiceEndpointItem> getItems() {
+ List<ExtensionsServiceEndpointItem> endpointItems = new ArrayList<>();
+ extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
return endpointItems;
}
- private List<RdfEndpointItem> getEndpointItems(RdfEndpoint e) {
+ private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
try {
String result = Request.Get(e.getEndpointUrl())
.addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON))
@@ -56,7 +56,7 @@ public class EndpointItemFetcher {
.returnContent()
.asString();
- return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference<List<RdfEndpointItem>>() {});
+ return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference<List<ExtensionsServiceEndpointItem>>() {});
} catch (IOException e1) {
logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl());
return new ArrayList<>();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java
new file mode 100644
index 0000000..4f73d8d
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionsServiceEndpointGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExtensionsServiceEndpointGenerator.class);
+
+ private String appId;
+ private SpServiceUrlProvider spServiceUrlProvider;
+
+ public ExtensionsServiceEndpointGenerator(String appId,
+ SpServiceUrlProvider spServiceUrlProvider) {
+ this.appId = appId;
+ this.spServiceUrlProvider = spServiceUrlProvider;
+ }
+
+ public String getEndpointResourceUrl() throws NoServiceEndpointsAvailableException {
+ return spServiceUrlProvider.getInvocationUrl(selectService(), appId);
+ }
+
+ public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException {
+ return selectService();
+ }
+
+ private List<String> getServiceEndpoints() {
+ return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+ Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString()));
+ }
+
+ private String selectService() throws NoServiceEndpointsAvailableException {
+ List<String> serviceEndpoints = getServiceEndpoints();
+ if (serviceEndpoints.size() > 0) {
+ return getServiceEndpoints().get(0);
+ } else {
+ LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, this.spServiceUrlProvider.getServiceTag(appId));
+ throw new NoServiceEndpointsAvailableException("Could not find any matching service endpoints");
+ }
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
deleted file mode 100644
index 3449f9e..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class PipelineElementEndpointGenerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(PipelineElementEndpointGenerator.class);
-
- private String appId;
- private PipelineElementUrl pipelineElementUrl;
-
- public PipelineElementEndpointGenerator(String appId,
- PipelineElementUrl pipelineElementUrl) {
- this.appId = appId;
- this.pipelineElementUrl = pipelineElementUrl;
- }
-
- public String getEndpointResourceUrl() {
- List<String> endpoints = getServiceEndpoints();
- return pipelineElementUrl.getInvocationUrl(selectService(endpoints), appId);
- }
-
- private List<String> getServiceEndpoints() {
- return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints("pe", true, Collections.singletonList(this.appId));
- }
-
- private String selectService(List<String> availableServices) {
- return availableServices.get(0);
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index 8cae245..64cd83c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,7 +18,9 @@
package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
import org.apache.streampipes.manager.secret.SecretProvider;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
@@ -80,31 +82,56 @@ public class PipelineExecutor {
decryptSecrets(graphs);
- graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(
- g.getAppId(),
- getPipelineElementType(g))
- .getEndpointResourceUrl()));
+ List<InvocableStreamPipesEntity> failedServices = new ArrayList<>();
+ graphs.forEach(g -> {
+ try {
+ g.setSelectedEndpointUrl(findSelectedEndpoint(g));
+ } catch (NoServiceEndpointsAvailableException e) {
+ failedServices.add(g);
+ }
+ });
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), graphs, dataSets)
- .invokeGraphs();
+ PipelineOperationStatus status;
+ if (failedServices.size() == 0) {
- encryptSecrets(graphs);
+ status = new GraphSubmitter(pipeline.getPipelineId(),
+ pipeline.getName(), graphs, dataSets)
+ .invokeGraphs();
- if (status.isSuccess()) {
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+ encryptSecrets(graphs);
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+ if (status.isSuccess()) {
+ storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
- if (storeStatus) {
- pipeline.setHealthStatus(PipelineHealthStatus.OK);
- setPipelineStarted(pipeline);
+ PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+ new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+
+ if (storeStatus) {
+ pipeline.setHealthStatus(PipelineHealthStatus.OK);
+ setPipelineStarted(pipeline);
+ }
}
+ } else {
+ List<PipelineElementStatus> pe = failedServices.stream().map(fs ->
+ new PipelineElementStatus(fs.getElementId(),
+ fs.getName(),
+ false,
+ "No active supporting service found")).collect(Collectors.toList());
+ status = new PipelineOperationStatus(pipeline.getPipelineId(),
+ pipeline.getName(),
+ "Could not start pipeline " + pipeline.getName() + ".",
+ pe);
}
return status;
}
+ private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+ return new ExtensionsServiceEndpointGenerator(
+ g.getAppId(),
+ getPipelineElementType(g))
+ .getEndpointResourceUrl();
+ }
+
private void updateGroupIds(InvocableStreamPipesEntity entity) {
entity.getInputStreams()
.stream()
@@ -180,8 +207,8 @@ public class PipelineExecutor {
return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
}
- private PipelineElementUrl getPipelineElementType(InvocableStreamPipesEntity entity) {
- return entity instanceof DataProcessorInvocation ? PipelineElementUrl.DATA_PROCESSOR : PipelineElementUrl.DATA_SINK;
+ private SpServiceUrlProvider getPipelineElementType(InvocableStreamPipesEntity entity) {
+ return entity instanceof DataProcessorInvocation ? SpServiceUrlProvider.DATA_PROCESSOR : SpServiceUrlProvider.DATA_SINK;
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index c7f0295..1c21088 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -36,8 +36,8 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.model.message.DataSetModificationMessage;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.PipelineModificationMessage;
@@ -150,7 +150,7 @@ public class Operations {
return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
}
- public static List<RdfEndpointItem> getEndpointUriContents(List<RdfEndpoint> endpoints) {
+ public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
return new EndpointItemFetcher(endpoints).getItems();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index 98ccd81..7c29181 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -18,10 +18,10 @@
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.storage.couchdb.impl.RdfEndpointStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.ExtensionsServiceEndpointStorageImpl;
import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.lightcouch.DesignDocument;
import org.lightcouch.DesignDocument.MapReduce;
@@ -94,10 +94,10 @@ public class CouchDbInstallationStep implements InstallationStep {
}
private Message addRdfEndpoints() {
- RdfEndpointStorageImpl rdfEndpointStorage = new RdfEndpointStorageImpl();
+ ExtensionsServiceEndpointStorageImpl rdfEndpointStorage = new ExtensionsServiceEndpointStorageImpl();
initRdfEndpointPorts
.forEach(p -> rdfEndpointStorage
- .addRdfEndpoint(new RdfEndpoint(initRdfEndpointHost + p)));
+ .addExtensionsServiceEndpoint(new ExtensionsServiceEndpoint(initRdfEndpointHost + p)));
return Notifications.success("Discovering pipeline element endpoints...");
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
index 461446b..d6ad3e4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.manager.setup;
import org.apache.streampipes.manager.endpoint.EndpointFetcher;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import org.apache.streampipes.model.client.setup.InitialSettings;
import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class InstallationConfiguration {
steps.add(new UserRegistrationInstallationStep(settings.getAdminEmail(), settings.getAdminPassword()));
if (settings.getInstallPipelineElements()) {
- for(RdfEndpoint endpoint : new EndpointFetcher().getEndpoints()) {
+ for(ExtensionsServiceEndpoint endpoint : new EndpointFetcher().getEndpoints()) {
steps.add(new PipelineElementInstallationStep(endpoint, settings.getAdminEmail()));
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
index d00a66c..109bc69 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
@@ -19,8 +19,8 @@ package org.apache.streampipes.manager.setup;
import org.apache.streampipes.manager.endpoint.EndpointItemParser;
import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notifications;
@@ -30,10 +30,10 @@ import java.util.List;
public class PipelineElementInstallationStep implements InstallationStep {
- private RdfEndpoint endpoint;
+ private ExtensionsServiceEndpoint endpoint;
private String userEmail;
- public PipelineElementInstallationStep(RdfEndpoint endpoint, String userEmail) {
+ public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint, String userEmail) {
this.endpoint = endpoint;
this.userEmail = userEmail;
}
@@ -41,8 +41,8 @@ public class PipelineElementInstallationStep implements InstallationStep {
@Override
public List<Message> install() {
List<Message> statusMessages = new ArrayList<>();
- List<RdfEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
- for(RdfEndpointItem item : items) {
+ List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
+ for(ExtensionsServiceEndpointItem item : items) {
statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(),
userEmail, true, false));
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
index e18fd32..f0fd68a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.manager.verification;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.graph.DataProcessorDescription;
@@ -65,14 +66,14 @@ public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescript
}
@Override
- protected void storeAssets() throws IOException {
+ protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(PipelineElementUrl.DATA_PROCESSOR, elementDescription.getAppId());
+ AssetManager.storeAsset(SpServiceUrlProvider.DATA_PROCESSOR, elementDescription.getAppId());
}
}
@Override
- protected void updateAssets() throws IOException {
+ protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.deleteAsset(elementDescription.getAppId());
storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
index a8c6060..6ff2721 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.manager.verification;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -74,14 +75,14 @@ public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
}
@Override
- protected void storeAssets() throws IOException {
+ protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(PipelineElementUrl.DATA_SINK, elementDescription.getAppId());
+ AssetManager.storeAsset(SpServiceUrlProvider.DATA_SINK, elementDescription.getAppId());
}
}
@Override
- protected void updateAssets() throws IOException {
+ protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.deleteAsset(elementDescription.getAppId());
storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
index b7d1b90..337acc9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.manager.verification;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.SpDataStream;
@@ -71,14 +72,14 @@ public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
}
@Override
- protected void storeAssets() throws IOException {
+ protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(PipelineElementUrl.DATA_STREAM, elementDescription.getAppId());
+ AssetManager.storeAsset(SpServiceUrlProvider.DATA_STREAM, elementDescription.getAppId());
}
}
@Override
- protected void updateAssets() throws IOException {
+ protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.deleteAsset(elementDescription.getAppId());
storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
index e928634..554ea6e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.manager.verification;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.storage.UserManagementService;
import org.apache.streampipes.manager.storage.UserService;
@@ -97,7 +98,7 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
if (state == StorageState.STORED) {
try {
storeAssets();
- } catch (IOException e) {
+ } catch (IOException | NoServiceEndpointsAvailableException e) {
e.printStackTrace();
}
return successMessage();
@@ -123,7 +124,7 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
update(username);
try {
updateAssets();
- } catch (IOException e) {
+ } catch (IOException | NoServiceEndpointsAvailableException e) {
e.printStackTrace();
}
return successMessage();
@@ -133,9 +134,9 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
}
- protected abstract void storeAssets() throws IOException;
+ protected abstract void storeAssets() throws IOException, NoServiceEndpointsAvailableException;
- protected abstract void updateAssets() throws IOException;
+ protected abstract void updateAssets() throws IOException, NoServiceEndpointsAvailableException;
private Message errorMessage() {
return new ErrorMessage(elementDescription.getName(), collectNotifications());
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
similarity index 76%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
rename to streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
index e80720d..c5dc776 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
@@ -23,9 +23,10 @@ import org.apache.streampipes.manager.endpoint.EndpointFetcher;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
-import org.apache.streampipes.storage.api.IRdfEndpointStorage;
+import org.apache.streampipes.storage.api.IExtensionsServiceEndpointStorage;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@@ -36,7 +37,7 @@ import java.util.List;
import java.util.stream.Collectors;
@Path("/v2/users/{username}/rdfendpoints")
-public class RdfEndpoint extends AbstractRestResource {
+public class ExtensionsServiceEndpointResource extends AbstractRestResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -49,9 +50,9 @@ public class RdfEndpoint extends AbstractRestResource {
@POST
@Produces(MediaType.APPLICATION_JSON)
@GsonWithIds
- public Response addRdfEndpoint(org.apache.streampipes.model.client.endpoint.RdfEndpoint rdfEndpoint) {
+ public Response addRdfEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint) {
getRdfEndpointStorage()
- .addRdfEndpoint(rdfEndpoint);
+ .addExtensionsServiceEndpoint(extensionsServiceEndpoint);
return Response.status(Response.Status.OK).build();
}
@@ -64,7 +65,7 @@ public class RdfEndpoint extends AbstractRestResource {
@GsonWithIds
public Response removeRdfEndpoint(@PathParam("rdfEndpointId") String rdfEndpointId) {
getRdfEndpointStorage()
- .removeRdfEndpoint(rdfEndpointId);
+ .removeExtensionsServiceEndpoint(rdfEndpointId);
return Response.status(Response.Status.OK).build();
}
@@ -74,9 +75,9 @@ public class RdfEndpoint extends AbstractRestResource {
@Produces(MediaType.APPLICATION_JSON)
@GsonWithIds
public Response getEndpointContents(@PathParam("username") String username) {
- List<org.apache.streampipes.model.client.endpoint.RdfEndpoint> endpoints = getEndpoints();
+ List<ExtensionsServiceEndpoint> endpoints = getEndpoints();
- List<RdfEndpointItem> items = Operations.getEndpointUriContents(endpoints);
+ List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(endpoints);
items.forEach(item -> item.setInstalled(isInstalled(item.getElementId(), username)));
// also add installed elements that are currently not running or available
@@ -90,7 +91,7 @@ public class RdfEndpoint extends AbstractRestResource {
@POST
@Path("/items/icon")
@Produces("image/png")
- public Response getEndpointItemIcon(RdfEndpointItem endpointItem) {
+ public Response getEndpointItemIcon(ExtensionsServiceEndpointItem endpointItem) {
try {
byte[] imageBytes = Request.Get(makeIconUrl(endpointItem)).execute().returnContent().asBytes();
return ok(imageBytes);
@@ -99,11 +100,11 @@ public class RdfEndpoint extends AbstractRestResource {
}
}
- private String makeIconUrl(RdfEndpointItem endpointItem) {
+ private String makeIconUrl(ExtensionsServiceEndpointItem endpointItem) {
return endpointItem.getUri() + "/assets/icon";
}
- private List<org.apache.streampipes.model.client.endpoint.RdfEndpoint> getEndpoints() {
+ private List<ExtensionsServiceEndpoint> getEndpoints() {
return new EndpointFetcher().getEndpoints();
}
@@ -121,25 +122,25 @@ public class RdfEndpoint extends AbstractRestResource {
return elementUris;
}
- private List<RdfEndpointItem> getAllDataStreamEndpoints(String username, List<RdfEndpointItem> existingItems) {
+ private List<ExtensionsServiceEndpointItem> getAllDataStreamEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
return getAllDataStreamUris(username)
.stream()
- .filter(s -> existingItems.stream().noneMatch(item -> item.getElementId().equals(s)))
+ .filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
.map(s -> getPipelineElementStorage().getDataStreamById(s))
.map(stream -> makeItem(stream, stream instanceof SpDataSet ? "set" : "stream"))
.collect(Collectors.toList());
}
- private List<RdfEndpointItem> getAllDataProcessorEndpoints(String username, List<RdfEndpointItem> existingItems) {
+ private List<ExtensionsServiceEndpointItem> getAllDataProcessorEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
return getAllDataProcessorUris(username)
.stream()
- .filter(s -> existingItems.stream().noneMatch(item -> item.getElementId().equals(s)))
+ .filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
.map(s -> getPipelineElementStorage().getDataProcessorById(s))
.map(source -> makeItem(source, "sepa"))
.collect(Collectors.toList());
}
- private List<RdfEndpointItem> getAllDataSinkEndpoints(String username, List<RdfEndpointItem> existingItems) {
+ private List<ExtensionsServiceEndpointItem> getAllDataSinkEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
return getAllDataSinkUris(username)
.stream()
.filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
@@ -148,8 +149,8 @@ public class RdfEndpoint extends AbstractRestResource {
.collect(Collectors.toList());
}
- private RdfEndpointItem makeItem(NamedStreamPipesEntity entity, String type) {
- RdfEndpointItem endpoint = new RdfEndpointItem();
+ private ExtensionsServiceEndpointItem makeItem(NamedStreamPipesEntity entity, String type) {
+ ExtensionsServiceEndpointItem endpoint = new ExtensionsServiceEndpointItem();
endpoint.setInstalled(true);
endpoint.setDescription(entity.getDescription());
endpoint.setName(entity.getName());
@@ -172,7 +173,7 @@ public class RdfEndpoint extends AbstractRestResource {
return getUserService().getOwnActionUris(username);
}
- private IRdfEndpointStorage getRdfEndpointStorage() {
+ private IExtensionsServiceEndpointStorage getRdfEndpointStorage() {
return getNoSqlStorage().getRdfEndpointStorage();
}
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
index 2d6630e..40be32b 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.svcdiscovery.api;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
import java.util.List;
import java.util.Map;
@@ -33,14 +33,21 @@ public interface ISpServiceDiscovery {
* @param port port of service endpoint
* @param tags tags of service
*/
- void registerService(String svcGroup, String svcId, String host, int port, List<SpServiceTag> tags);
+ void registerService(SpServiceRegistrationRequest serviceRegistrationRequest);
/**
* Get active pipeline element service endpoints
*
* @return list of pipeline element endpoints
*/
- List<String> getActivePeEndpoints();
+ List<String> getActivePipelineElementEndpoints();
+
+ /**
+ * Get active StreamPipes Connect worker endpoints
+ *
+ * @return list of StreamPipes Connect worker endpoints
+ */
+ List<String> getActiveConnectWorkerEndpoints();
/**
* Get service endpoints
@@ -50,7 +57,8 @@ public interface ISpServiceDiscovery {
* @param filterByTags filter param to filter list of registered services
* @return list of services
*/
- List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+ List<String> getServiceEndpoints(String svcGroup,
+ boolean restrictToHealthy,
List<String> filterByTags);
/**
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
index d263f07..5fd3452 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
@@ -20,5 +20,6 @@ package org.apache.streampipes.svcdiscovery.api.model;
public class DefaultSpServiceGroups {
public static final String CORE = "core";
+ public static final String EXT = "ext";
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
index 093df8c..fc4f980 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
@@ -19,9 +19,9 @@ package org.apache.streampipes.svcdiscovery.api.model;
public class DefaultSpServiceTags {
- public static final String CORE = "core";
- public static final String PE = "pe";
- public static final String CONNECT_MASTER = "connect-master";
- public static final String CONNECT_WORKER = "connect-worker";
- public static final String STREAMPIPES_CLIENT = "streampipes-client";
+ public static final SpServiceTag CORE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "core");
+ public static final SpServiceTag PE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "pe");
+ public static final SpServiceTag CONNECT_MASTER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-master");
+ public static final SpServiceTag CONNECT_WORKER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-worker");
+ public static final SpServiceTag STREAMPIPES_CLIENT = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "streampipes-client");
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
similarity index 81%
rename from streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
index d391381..970ca73 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
@@ -15,14 +15,15 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.svcdiscovery.api.model;
-public class PipelineElementPrefix {
+public class SpServicePathPrefix {
public static final String DATA_PROCESSOR = "sepa";
public static final String DATA_SINK = "sec";
public static final String DATA_STREAM = "stream";
public static final String DATA_SET = "set";
- public static final String ADAPTER = "adapter";
+ public static final String ADAPTER = "api/v1/worker/adapters";
+ public static final String PROTOCOL = "api/v1/worker/protocols";
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
new file mode 100644
index 0000000..4926cdd
--- /dev/null
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.svcdiscovery.api.model;
+
+import java.util.List;
+
+public class SpServiceRegistrationRequest {
+
+ private String svcGroup;
+ private String svcId;
+ private String host;
+ private int port;
+ private List<SpServiceTag> tags;
+ private String healthCheckPath;
+
+ public static SpServiceRegistrationRequest from(String svcGroup,
+ String svcId,
+ String host,
+ Integer port,
+ List<SpServiceTag> tags) {
+ return new SpServiceRegistrationRequest(svcGroup, svcId, host, port, tags, "");
+ }
+
+ public static SpServiceRegistrationRequest from(String svcGroup,
+ String svcId,
+ String host,
+ Integer port,
+ List<SpServiceTag> tags,
+ String healthCheckPath) {
+ return new SpServiceRegistrationRequest(svcGroup, svcId, host, port, tags, healthCheckPath);
+ }
+
+ public SpServiceRegistrationRequest(String svcGroup,
+ String svcId,
+ String host,
+ int port,
+ List<SpServiceTag> tags,
+ String healthCheckPath) {
+ this.svcGroup = svcGroup;
+ this.svcId = svcId;
+ this.host = host;
+ this.port = port;
+ this.tags = tags;
+ this.healthCheckPath = healthCheckPath;
+ }
+
+ public String getSvcGroup() {
+ return svcGroup;
+ }
+
+ public void setSvcGroup(String svcGroup) {
+ this.svcGroup = svcGroup;
+ }
+
+ public String getSvcId() {
+ return svcId;
+ }
+
+ public void setSvcId(String svcId) {
+ this.svcId = svcId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public List<SpServiceTag> getTags() {
+ return tags;
+ }
+
+ public void setTags(List<SpServiceTag> tags) {
+ this.tags = tags;
+ }
+
+ public String getHealthCheckPath() {
+ return healthCheckPath;
+ }
+
+ public void setHealthCheckPath(String healthCheckPath) {
+ this.healthCheckPath = healthCheckPath;
+ }
+}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
index 12dde94..220f813 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
@@ -23,7 +23,8 @@ public enum SpServiceTagPrefix {
PROTOCOL("protocol"),
DATA_STREAM("dstream"),
DATA_PROCESSOR("dprocessor"),
- DATA_SINK("dsink");
+ DATA_SINK("dsink"),
+ DATA_SET("dset");
private String prefix;
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
similarity index 67%
rename from streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
index 58d0ba7..68f54c2 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
@@ -15,22 +15,27 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.svcdiscovery.api.model;
-public enum PipelineElementUrl {
+public enum SpServiceUrlProvider {
- DATA_PROCESSOR(PipelineElementPrefix.DATA_PROCESSOR),
- DATA_SINK(PipelineElementPrefix.DATA_SINK),
- DATA_STREAM(PipelineElementPrefix.DATA_STREAM),
- DATA_SET(PipelineElementPrefix.DATA_SET),
- ADAPTER(PipelineElementPrefix.ADAPTER);
+ DATA_PROCESSOR(SpServicePathPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_PROCESSOR),
+ DATA_SINK(SpServicePathPrefix.DATA_SINK, SpServiceTagPrefix.DATA_SINK),
+ DATA_STREAM(SpServicePathPrefix.DATA_STREAM, SpServiceTagPrefix.DATA_STREAM),
+ DATA_SET(SpServicePathPrefix.DATA_SET, SpServiceTagPrefix.DATA_SET),
+ ADAPTER(SpServicePathPrefix.ADAPTER, SpServiceTagPrefix.ADAPTER),
+ PROTOCOL(SpServicePathPrefix.PROTOCOL, SpServiceTagPrefix.PROTOCOL);
private final String HTTP = "http://";
private final String SLASH = "/";
+
private final String prefix;
+ private final SpServiceTagPrefix serviceTagPrefix;
- PipelineElementUrl(String prefix) {
+ SpServiceUrlProvider(String prefix,
+ SpServiceTagPrefix serviceTagPrefix) {
this.prefix = prefix;
+ this.serviceTagPrefix = serviceTagPrefix;
}
public String getPrefix() {
@@ -72,5 +77,11 @@ public enum PipelineElementUrl {
+ invocationId;
}
+ public SpServiceTagPrefix getServiceTagPrefix() {
+ return serviceTagPrefix;
+ }
+ public SpServiceTag getServiceTag(String appId) {
+ return SpServiceTag.create(serviceTagPrefix, appId);
+ }
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 105d6da..884546e 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -27,6 +27,9 @@ import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.apache.streampipes.svcdiscovery.consul.model.ConsulServiceRegistrationBody;
import org.apache.streampipes.svcdiscovery.consul.model.HealthCheckConfiguration;
@@ -48,16 +51,12 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
private static final String PE_SVC_TAG = "pe";
@Override
- public void registerService(String svcGroup,
- String svcId,
- String host,
- int port,
- List<SpServiceTag> tags) {
+ public void registerService(SpServiceRegistrationRequest req) {
boolean connected = false;
while (!connected) {
- LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", svcGroup, svcId, host, port);
- ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, asString(tags));
+ LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", req.getSvcGroup(), req.getSvcId(), req.getHost(), req.getPort());
+ ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(req);
connected = registerServiceHttpClient(svcRegistration);
if (!connected) {
@@ -69,7 +68,7 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
}
}
}
- LOG.info("Successfully registered service at Consul: " + svcId);
+ LOG.info("Successfully registered service at Consul: " + req.getSvcId());
}
private List<String> asString(List<SpServiceTag> tags) {
@@ -77,9 +76,17 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
}
@Override
- public List<String> getActivePeEndpoints() {
- LOG.info("Load active pipeline element service endpoints");
- return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PE_SVC_TAG));
+ public List<String> getActivePipelineElementEndpoints() {
+ LOG.info("Discovering active pipeline element service endpoints");
+ return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+ Collections.singletonList(DefaultSpServiceTags.PE.asString()));
+ }
+
+ @Override
+ public List<String> getActiveConnectWorkerEndpoints() {
+ LOG.info("Discovering active StreamPipes Connect worker service endpoints");
+ return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+ Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
}
@Override
@@ -144,24 +151,6 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
*/
private boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
try {
-
-// Consul client = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", 8500)).build();
-// AgentClient agentClient = client.agentClient();
-//
-// String serviceId = "2";
-// Registration service = ImmutableRegistration.builder()
-// .id(serviceId)
-// .name(svcRegistration.getName())
-// .port(svcRegistration.getPort())
-// .address("http://" + InetAddress.getLocalHost().getHostAddress())
-// .check(Registration.RegCheck.http(InetAddress.getLocalHost().getHostAddress() + COLON + svcRegistration.getPort(), 10000))
-// //.check(Registration.RegCheck.ttl(3L)) // registers with a TTL of 3 seconds
-// .tags(Collections.singletonList("tag1"))
-// .meta(Collections.singletonMap("version", "1.0"))
-// .build();
-//
-// agentClient.register(service);
-
String endpoint = makeConsulEndpoint();
String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
@@ -177,17 +166,19 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
return false;
}
- private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
- int port, List<String> tags) {
+ private ConsulServiceRegistrationBody createRegistrationBody(SpServiceRegistrationRequest req) {
ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
- body.setID(id);
- body.setName(svcGroup);
- body.setTags(tags);
- body.setAddress(HTTP_PROTOCOL + host);
- body.setPort(port);
+ body.setID(req.getSvcId());
+ body.setName(req.getSvcGroup());
+ body.setTags(asString(req.getTags()));
+ body.setAddress(HTTP_PROTOCOL + req.getHost());
+ body.setPort(req.getPort());
body.setEnableTagOverride(true);
- body.setCheck(new HealthCheckConfiguration("GET",
- (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+ body.setCheck(new HealthCheckConfiguration(
+ "GET",
+ (HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath()),
+ HEALTH_CHECK_INTERVAL,
+ "60s"));
return body;
}
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
index 94d7342..ed5ca91 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
@@ -21,14 +21,19 @@ public class HealthCheckConfiguration {
private String Method;
private String http;
private String interval;
+ private String deregister_critical_service_after;
public HealthCheckConfiguration() {
}
- public HealthCheckConfiguration(String method, String http, String interval) {
+ public HealthCheckConfiguration(String method,
+ String http,
+ String interval,
+ String deregisterAfter) {
Method = method;
this.http = http;
this.interval = interval;
+ this.deregister_critical_service_after = deregisterAfter;
}
public String getMethod() {
@@ -54,4 +59,12 @@ public class HealthCheckConfiguration {
public void setInterval(String interval) {
this.interval = interval;
}
+
+ public String getDeregister_critical_service_after() {
+ return deregister_critical_service_after;
+ }
+
+ public void setDeregister_critical_service_after(String deregister_critical_service_after) {
+ this.deregister_critical_service_after = deregister_critical_service_after;
+ }
}
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
index f9adc7e..c7fb289 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.service.extensions.base;
import org.apache.streampipes.container.base.StreamPipesServiceBase;
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
SpServiceDefinition serviceDef) throws UnknownHostException {
this.startStreamPipesService(
serviceClass,
- serviceDef.getServiceGroup(),
+ DefaultSpServiceGroups.EXT,
serviceDef.getServiceId(),
serviceDef.getDefaultPort()
);
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
similarity index 70%
rename from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
rename to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
index 861bbe3..e1b7903 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
@@ -18,15 +18,15 @@
package org.apache.streampipes.storage.api;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import java.util.List;
-public interface IRdfEndpointStorage {
+public interface IExtensionsServiceEndpointStorage {
- void addRdfEndpoint(RdfEndpoint rdfEndpoint);
+ void addExtensionsServiceEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint);
- void removeRdfEndpoint(String rdfEndpointId);
+ void removeExtensionsServiceEndpoint(String extensionServiceEndpointId);
- List<RdfEndpoint> getRdfEndpoints();
+ List<ExtensionsServiceEndpoint> getExtensionsServiceEndpoints();
}
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index cdb9ba4..54f8f27 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -41,7 +41,7 @@ public interface INoSqlStorage {
IVisualizationStorage getVisualizationStorageApi();
- IRdfEndpointStorage getRdfEndpointStorage();
+ IExtensionsServiceEndpointStorage getRdfEndpointStorage();
IAssetDashboardStorage getAssetDashboardStorage();
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 608f1f3..288ae0f 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -76,8 +76,8 @@ public enum CouchDbStorageManager implements INoSqlStorage {
}
@Override
- public IRdfEndpointStorage getRdfEndpointStorage() {
- return new RdfEndpointStorageImpl();
+ public IExtensionsServiceEndpointStorage getRdfEndpointStorage() {
+ return new ExtensionsServiceEndpointStorageImpl();
}
@Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
similarity index 55%
rename from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java
rename to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
index e2ffb70..c9d454f 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
@@ -18,33 +18,32 @@
package org.apache.streampipes.storage.couchdb.impl;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.storage.api.IRdfEndpointStorage;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.storage.api.IExtensionsServiceEndpointStorage;
import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
import org.apache.streampipes.storage.couchdb.utils.Utils;
import java.util.List;
-public class RdfEndpointStorageImpl extends AbstractDao<RdfEndpoint> implements IRdfEndpointStorage {
+public class ExtensionsServiceEndpointStorageImpl extends AbstractDao<ExtensionsServiceEndpoint> implements IExtensionsServiceEndpointStorage {
- public RdfEndpointStorageImpl() {
- super(Utils::getCouchDbRdfEndpointClient, RdfEndpoint.class);
- }
+ public ExtensionsServiceEndpointStorageImpl() {
+ super(Utils::getCouchDbRdfEndpointClient, ExtensionsServiceEndpoint.class);
+ }
+ @Override
+ public void addExtensionsServiceEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint) {
+ persist(extensionsServiceEndpoint);
+ }
- @Override
- public void addRdfEndpoint(RdfEndpoint rdfEndpoint) {
- persist(rdfEndpoint);
- }
+ @Override
+ public void removeExtensionsServiceEndpoint(String rdfEndpointId) {
+ delete(rdfEndpointId);
+ }
- @Override
- public void removeRdfEndpoint(String rdfEndpointId) {
- delete(rdfEndpointId)
-; }
-
- @Override
- public List<RdfEndpoint> getRdfEndpoints() {
- return findAll();
- }
+ @Override
+ public List<ExtensionsServiceEndpoint> getExtensionsServiceEndpoints() {
+ return findAll();
+ }
}
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index 391b7f1..aca69a5 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -79,14 +79,14 @@ export class DataMarketplaceService {
stopAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
return this.http.post(this.adapterMasterUrl
- + adapter.couchDBId
+ + adapter.id
+ "/stop", {})
.pipe(map(response => Message.fromData(response as any)));
}
startAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
return this.http.post(this.adapterMasterUrl
- + adapter.couchDBId
+ + adapter.id
+ "/start", {})
.pipe(map(response => Message.fromData(response as any)));;
}
@@ -116,7 +116,7 @@ export class DataMarketplaceService {
this.host +
this.authStatusService.email +
url +
- adapter.couchDBId
+ adapter.id
);
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 4fa3957..9ac229d 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-05-24 18:53:06.
+// Generated using typescript-generator version 2.27.744 on 2021-06-21 21:05:13.
export class AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -120,6 +120,7 @@ export class Accuracy extends EventPropertyQualityDefinition {
export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+ _rev: string;
appId: string;
applicationLinks: ApplicationLink[];
connectedTo: string[];
@@ -151,25 +152,26 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
instance.internallyManaged = data.internallyManaged;
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.dom = data.dom;
instance.uri = data.uri;
+ instance.dom = data.dom;
+ instance._rev = data._rev;
return instance;
}
}
export class AdapterDescription extends NamedStreamPipesEntity {
"@class": "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription" | "org.apache.streampipes.mod [...]
- _rev: string;
adapterId: string;
adapterType: string;
category: string[];
config: StaticPropertyUnion[];
- couchDBId: string;
createdAt: number;
eventGrounding: EventGrounding;
icon: string;
+ id: string;
rules: TransformationRuleDescriptionUnion[];
schemaRules: any[];
+ selectedEndpointUrl: string;
streamRules: any[];
userName: string;
valueRules: any[];
@@ -189,11 +191,11 @@ export class AdapterDescription extends NamedStreamPipesEntity {
instance.rules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.rules);
instance.category = __getCopyArrayFn(__identity<string>())(data.category);
instance.createdAt = data.createdAt;
+ instance.selectedEndpointUrl = data.selectedEndpointUrl;
+ instance.id = data.id;
+ instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
instance.schemaRules = __getCopyArrayFn(__identity<any>())(data.schemaRules);
- instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
- instance.couchDBId = data.couchDBId;
- instance._rev = data._rev;
return instance;
}
@@ -979,6 +981,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
correspondingPipeline: string;
correspondingUser: string;
inputStreams: SpDataStreamUnion[];
+ selectedEndpointUrl: string;
staticProperties: StaticPropertyUnion[];
statusInfoSettings: ElementStatusInfoSettings;
streamRequirements: SpDataStreamUnion[];
@@ -1001,6 +1004,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
instance.streamRequirements = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.streamRequirements);
instance.configured = data.configured;
instance.uncompleted = data.uncompleted;
+ instance.selectedEndpointUrl = data.selectedEndpointUrl;
return instance;
}
}
@@ -1647,9 +1651,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
}
const instance = target || new GenericAdapterSetDescription();
super.fromData(data, instance);
- instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
- instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
+ instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+ instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
return instance;
}
}
@@ -1666,9 +1670,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
}
const instance = target || new GenericAdapterStreamDescription();
super.fromData(data, instance);
- instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
- instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
+ instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+ instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
return instance;
}
}