You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/07 10:27:48 UTC
[incubator-streampipes] 02/03: Working on simplifying adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit c87cec6ff0fa7ef1cecee2d5fa321d152a5c597c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Oct 6 19:15:37 2021 +0200
Working on simplifying adapters
---
.../master/health/AdapterHealthCheck.java | 4 +-
.../master/management/AdapterMasterManagement.java | 50 +++++----
.../AdapterTemplateMasterManagement.java | 1 +
.../master/management/DescriptionManagement.java | 23 ++--
.../master/management/SourcesManagement.java | 8 +-
.../management/WorkerAdministrationManagement.java | 28 ++++-
.../master/management/WorkerRestClient.java | 6 +-
.../master/management/WorkerUrlProvider.java | 15 +--
.../management/AdapterMasterManagementTest.java | 15 ++-
.../AdapterTemplateMasterManagementTest.java | 1 +
.../master/management/SourcesManagementTest.java | 15 ++-
.../worker/init/AdapterWorkerContainer.java | 2 +-
.../init/ConnectWorkerDescriptionProvider.java | 3 +
.../init/ConnectWorkerRegistrationService.java | 5 +-
.../extensions/ExtensionsModelSubmitter.java | 2 +-
.../model/connect/adapter/AdapterDescription.java | 3 +
.../connect/adapter/AdapterDescriptionList.java | 1 +
.../apache/streampipes/rest/ResetManagement.java | 2 +-
.../rest/impl/connect/AdapterResource.java | 8 +-
.../rest/impl/connect/AdapterTemplateResource.java | 1 +
.../rest/impl/connect/DescriptionResource.java | 5 +-
.../rest/impl/connect/SourcesResource.java | 6 +-
.../rest/impl/connect/WelcomePageMaster.java | 2 +-
.../impl/connect/WorkerAdministrationResource.java | 2 +-
.../storage/api/IAdapterTemplateStorage.java | 1 +
.../streampipes/storage/api/INoSqlStorage.java | 2 -
.../storage/couchdb/CouchDbStorageManager.java | 7 +-
...mpl.java => AdapterDescriptionStorageImpl.java} | 12 +--
...geImpl.java => AdapterInstanceStorageImpl.java} | 8 +-
.../couchdb/impl/AdapterTemplateStorageImpl.java | 3 +-
.../streampipes/storage/couchdb/utils/Utils.java | 8 +-
.../data-marketplace/data-marketplace.component.ts | 26 ++---
.../connect/services/data-marketplace.service.ts | 119 ++++++++++-----------
33 files changed, 201 insertions(+), 193 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 89bdd64..f343c8b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -22,7 +22,7 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import java.util.List;
@@ -33,7 +33,7 @@ public class AdapterHealthCheck {
public void checkAndRestoreAdapters() {
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
- IAdapterStorage adapterStorage = new AdapterStorageImpl();
+ IAdapterStorage adapterStorage = new AdapterInstanceStorageImpl();
// Get all adapters
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 77342c5..ab70e8e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -35,7 +35,8 @@ import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterDescriptionStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.apache.streampipes.storage.management.StorageManager;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.slf4j.Logger;
@@ -53,20 +54,21 @@ public class AdapterMasterManagement {
private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
- private IAdapterStorage adapterStorage;
+ private IAdapterStorage adapterInstanceStorage;
private WorkerUrlProvider workerUrlProvider;
public AdapterMasterManagement() {
- this.adapterStorage = getAdapterStorage();
+ this.adapterInstanceStorage = getAdapterInstanceStorage();
+ this.adapterDescriptionStorage = new AdapterDescriptionStorageImpl();
this.workerUrlProvider = new WorkerUrlProvider();
}
public AdapterMasterManagement(IAdapterStorage adapterStorage) {
- this.adapterStorage = adapterStorage;
+ this.adapterInstanceStorage = adapterStorage;
}
public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException, NoServiceEndpointsAvailableException {
- IAdapterStorage adapterStorage = getAdapterStorage();
+ IAdapterStorage adapterStorage = getAdapterInstanceStorage();
List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
for (AdapterDescription ad : allAdapters) {
@@ -97,13 +99,12 @@ public class AdapterMasterManagement {
ad.setElementId(ad.getElementId() + ":" + uuid);
ad.setCreatedAt(System.currentTimeMillis());
ad.setSelectedEndpointUrl(endpointUrl);
- ad.setCorrespondingServiceGroup(new WorkerUrlProvider().getWorkerServiceGroup(ad.getAppId()));
AdapterDescription encryptedAdapterDescription =
new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
// store in db
- String adapterId = adapterStorage.storeAdapter(encryptedAdapterDescription);
+ String adapterId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
// start when stream adapter
if (ad instanceof AdapterStreamDescription) {
@@ -129,7 +130,7 @@ public class AdapterMasterManagement {
}
public AdapterDescription getAdapter(String id) throws AdapterException {
- List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+ List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
if (allAdapters != null && id != null) {
for (AdapterDescription ad : allAdapters) {
@@ -161,9 +162,9 @@ public class AdapterMasterManagement {
}
- AdapterDescription ad = adapterStorage.getAdapter(elementId);
+ AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
String username = ad.getUserName();
- adapterStorage.deleteAdapter(elementId);
+ adapterInstanceStorage.deleteAdapter(elementId);
LOG.info("Successfully deleted adapter: " + elementId);
UserService userService = getUserService();
@@ -176,9 +177,20 @@ public class AdapterMasterManagement {
}
}
- public List<AdapterDescription> getAllAdapters() throws AdapterException {
+ public List<AdapterDescription> getAllAdapterInstances() throws AdapterException {
- List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+ List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
+
+ if (allAdapters == null) {
+ throw new AdapterException("Could not get all adapters");
+ }
+
+ return allAdapters;
+ }
+
+ public List<AdapterDescription> getAllAdapterDescriptions() throws AdapterException {
+
+ List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
if (allAdapters == null) {
throw new AdapterException("Could not get all adapters");
@@ -187,7 +199,7 @@ public class AdapterMasterManagement {
return allAdapters;
}
- public void stopSetAdapter(String adapterId, String baseUrl, AdapterStorageImpl adapterStorage) throws AdapterException {
+ public void stopSetAdapter(String adapterId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(adapterId);
@@ -195,7 +207,7 @@ public class AdapterMasterManagement {
}
public void stopStreamAdapter(String elementId) throws AdapterException {
- AdapterDescription ad = adapterStorage.getAdapter(elementId);
+ AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
if (!isStreamAdapter(elementId)) {
throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
@@ -207,7 +219,7 @@ public class AdapterMasterManagement {
public void startStreamAdapter(String elementId) throws AdapterException {
// TODO ensure that adapter is not started twice
- AdapterDescription ad = adapterStorage.getAdapter(elementId);
+ AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
try {
String endpointUrl = findEndpointUrl(ad);
URI uri = new URI(endpointUrl);
@@ -216,7 +228,7 @@ public class AdapterMasterManagement {
throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
} else {
ad.setSelectedEndpointUrl(baseUrl);
- adapterStorage.updateAdapter(ad);
+ adapterInstanceStorage.updateAdapter(ad);
AdapterDescription decryptedAdapterDescription =
new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
@@ -232,7 +244,7 @@ public class AdapterMasterManagement {
}
public boolean isStreamAdapter(String id) {
- AdapterDescription adapterDescription = adapterStorage.getAdapter(id);
+ AdapterDescription adapterDescription = adapterInstanceStorage.getAdapter(id);
return isStreamAdapter(adapterDescription);
}
@@ -240,8 +252,8 @@ public class AdapterMasterManagement {
return adapterDescription instanceof AdapterStreamDescription;
}
- private IAdapterStorage getAdapterStorage() {
- return new AdapterStorageImpl();
+ private IAdapterStorage getAdapterInstanceStorage() {
+ return new AdapterInstanceStorageImpl();
}
private String findEndpointUrl(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
index b3df802..fec1eb9 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.UUID;
+@Deprecated
public class AdapterTemplateMasterManagement {
private IAdapterTemplateStorage adapterTemplateStorage;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index b2e60a0..49f23b2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -19,14 +19,15 @@
package org.apache.streampipes.connect.container.master.management;
import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescriptionList;
import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.couchdb.impl.AdapterDescriptionStorageImpl;
import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
import java.util.List;
@@ -35,6 +36,7 @@ import java.util.Optional;
public class DescriptionManagement {
+ @Deprecated
public ProtocolDescriptionList getProtocols() {
ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
@@ -61,22 +63,13 @@ public class DescriptionManagement {
return result;
}
- public AdapterDescriptionList getAdapters() {
- ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
-
- List<ConnectWorkerContainer> allWorkerContainter = connectionWorkerContainerStorage.getAllConnectWorkerContainers();
-
- AdapterDescriptionList result = new AdapterDescriptionList();
-
- for (ConnectWorkerContainer connectWorkerContainer : allWorkerContainter) {
- result.getList().addAll(connectWorkerContainer.getAdapters());
- }
-
- return result;
+ public List<AdapterDescription> getAdapters() {
+ IAdapterStorage adapterStorage = new AdapterDescriptionStorageImpl();
+ return adapterStorage.getAllAdapters();
}
public Optional<AdapterDescription> getAdapter(String id) {
- return getAdapters().getList().stream()
+ return getAdapters().stream()
.filter(desc -> desc.getAppId().equals(id))
.findFirst();
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index e48520e..f362473 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -33,7 +33,7 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,17 +47,17 @@ public class SourcesManagement {
private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
- private AdapterStorageImpl adapterStorage;
+ private AdapterInstanceStorageImpl adapterStorage;
private WorkerUrlProvider workerUrlProvider;
private String connectHost = null;
- public SourcesManagement(AdapterStorageImpl adapterStorage) {
+ public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
this.adapterStorage = adapterStorage;
this.workerUrlProvider = new WorkerUrlProvider();
}
public SourcesManagement() {
- this(new AdapterStorageImpl());
+ this(new AdapterInstanceStorageImpl());
}
public void addAdapter(String streamId, SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index 2df8f95..a3be118 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -19,11 +19,15 @@
package org.apache.streampipes.connect.container.master.management;
import org.apache.streampipes.connect.container.master.health.AdapterHealthCheck;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class WorkerAdministrationManagement {
private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
@@ -31,18 +35,34 @@ public class WorkerAdministrationManagement {
private ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage;
private AdapterMasterManagement adapterMasterManagement;
+ private IAdapterStorage adapterStorage;
+
private AdapterHealthCheck adapterHealthCheck;
public WorkerAdministrationManagement() {
this.connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
this.adapterMasterManagement = new AdapterMasterManagement();
this.adapterHealthCheck = new AdapterHealthCheck();
+ this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterStorage();
}
- public void register(ConnectWorkerContainer connectWorker) {
- // TODO how do I register the protocols and adapters of a worker?
+ public void register(List<AdapterDescription> availableAdapterDescription) {
+
+ List<AdapterDescription> alreadyRegisteredAdapters = this.adapterStorage.getAllAdapters();
+
+ availableAdapterDescription.forEach(adapterDescription -> {
+
+ // only install once adapter description per service group
+ boolean alreadyInstalled = alreadyRegisteredAdapters
+ .stream()
+ .anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()) && a.getCorrespondingServiceGroup().equals(adapterDescription.getCorrespondingServiceGroup()));
+ if (!alreadyInstalled) {
+ this.adapterStorage.storeAdapter(adapterDescription);
+ }
+ });
+
+// this.adapterHealthCheck.checkAndRestoreAdapters();
- this.adapterHealthCheck.checkAndRestoreAdapters();
// Check if already registered
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 2043732..a873f8c 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -31,7 +31,7 @@ import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public class WorkerRestClient {
public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
String url = baseUrl + WorkerPaths.getStreamStopPath();
- AdapterDescription ad = getAdapterDescriptionById(new AdapterStorageImpl(), adapterStreamDescription.getElementId());
+ AdapterDescription ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
stopAdapter(ad, url);
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
@@ -196,7 +196,7 @@ public class WorkerRestClient {
}
- private static AdapterDescription getAdapterDescriptionById(AdapterStorageImpl adapterStorage, String id) {
+ private static AdapterDescription getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) {
AdapterDescription adapterDescription = null;
List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
for (AdapterDescription a : allAdapters) {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
index 2b93972..9a0648e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
@@ -28,8 +28,6 @@ import org.apache.streampipes.storage.api.IConnectWorkerContainerStorage;
import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
-import java.util.List;
-
public class WorkerUrlProvider {
private IConnectWorkerContainerStorage connectWorkerContainerStorage;
@@ -81,18 +79,7 @@ public class WorkerUrlProvider {
}
private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) {
- SpServiceUrlProvider provider = null;
- List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectWorkerContainerStorage.getAllConnectWorkerContainers();
-
- for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
- if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(appId))) {
- provider = SpServiceUrlProvider.PROTOCOL;
- } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(appId))) {
- provider = SpServiceUrlProvider.ADAPTER;
- }
- }
-
- return new ExtensionsServiceEndpointGenerator(appId, provider);
+ return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.ADAPTER);
}
}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
index 7b6dfb4..6c874e3 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.connect.container.master.management;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.junit.Test;
import java.util.Arrays;
@@ -35,7 +35,7 @@ public class AdapterMasterManagementTest {
@Test(expected = AdapterException.class)
public void getAdapterFailNull() throws AdapterException {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(null);
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
@@ -46,7 +46,7 @@ public class AdapterMasterManagementTest {
@Test(expected = AdapterException.class)
public void getAdapterFail() throws AdapterException {
List<AdapterDescription> adapterDescriptions = Arrays.asList(new GenericAdapterStreamDescription());
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(adapterDescriptions);
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
@@ -57,25 +57,24 @@ public class AdapterMasterManagementTest {
@Test
public void getAllAdaptersSuccess() throws AdapterException {
List<AdapterDescription> adapterDescriptions = Arrays.asList(new GenericAdapterStreamDescription());
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(adapterDescriptions);
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
- List<AdapterDescription> result = adapterMasterManagement.getAllAdapters();
+ List<AdapterDescription> result = adapterMasterManagement.getAllAdapterInstances();
assertEquals(1, result.size());
- assertEquals(GenericAdapterStreamDescription.ID, result.get(0).getUri());
}
@Test(expected = AdapterException.class)
public void getAllAdaptersFail() throws AdapterException {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(null);
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
- adapterMasterManagement.getAllAdapters();
+ adapterMasterManagement.getAllAdapterInstances();
}
}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
index 6fc7133..17bc969 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+@Deprecated
public class AdapterTemplateMasterManagementTest {
@Test
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index b007fcb..bdad3b9 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -22,7 +22,7 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -56,7 +56,7 @@ public class SourcesManagementTest {
@Ignore
@Test
public void addAdapterSuccess() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
doNothing().when(WorkerRestClient.class, "invokeSetAdapter", anyString(), any());
@@ -72,7 +72,7 @@ public class SourcesManagementTest {
@Ignore
@Test(expected = AdapterException.class)
public void addAdapterFail() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
@@ -84,7 +84,7 @@ public class SourcesManagementTest {
@Ignore
@Test
public void detachAdapterSuccess() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
doNothing().when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
@@ -99,7 +99,7 @@ public class SourcesManagementTest {
@Ignore
@Test(expected = AdapterException.class)
public void detachAdapterFail() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
org.powermock.api.mockito.PowerMockito.doThrow(new AdapterException()).when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
@@ -111,7 +111,7 @@ public class SourcesManagementTest {
@Test
public void getAllAdaptersInstallDescriptionSuccess() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
@@ -124,9 +124,8 @@ public class SourcesManagementTest {
@Test(expected = AdapterException.class)
public void getAllAdaptersInstallDescriptionFail() throws Exception {
- AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+ AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
AdapterDescription adapterDescription = new GenericAdapterSetDescription();
- adapterDescription.setUri(" ");
when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
sourcesManagement.setConnectHost("host");
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index 6f868dd..032c633 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -50,7 +50,7 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
@Override
public void afterServiceRegistered(SpServiceDefinition serviceDef) {
- new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
+ new ConnectWorkerRegistrationService().registerWorker(serviceDef);
}
@Override
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index 0b48a29..0fbc203 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -44,6 +44,7 @@ public class ConnectWorkerDescriptionProvider {
List<AdapterDescription> adapters = new ArrayList<>();
for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel());
+ desc.setCorrespondingServiceGroup(serviceGroup);
adapters.add(desc);
}
@@ -54,11 +55,13 @@ public class ConnectWorkerDescriptionProvider {
GenericAdapterStreamDescription desc = new GenericAdapterStreamDescription();
desc.setAppId(protocolDescription.getAppId());
desc.setProtocolDescription(protocolDescription);
+ desc.setCorrespondingServiceGroup(serviceGroup);
adapters.add(desc);
} else if (protocolDescription instanceof ProtocolSetDescription) {
GenericAdapterSetDescription desc = new GenericAdapterSetDescription();
desc.setAppId(protocolDescription.getAppId());
desc.setProtocolDescription(protocolDescription);
+ desc.setCorrespondingServiceGroup(serviceGroup);
adapters.add(desc);
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index 44a242e..9de2137 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.container.worker.init;
import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
+import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.slf4j.Logger;
@@ -30,7 +31,7 @@ public class ConnectWorkerRegistrationService {
private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerRegistrationService.class);
- public void registerWorker(String serviceGroup) {
+ public void registerWorker(SpServiceDefinition serviceDef) {
boolean connected = false;
while (!connected) {
@@ -39,7 +40,7 @@ public class ConnectWorkerRegistrationService {
String masterUrl = coreServices.get(0) + "/streampipes-backend";
LOG.info("Trying to connect to master: " + masterUrl);
connected = MasterRestClient.register(masterUrl,
- new ConnectWorkerDescriptionProvider().getContainerDescription(serviceGroup));
+ new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
if (connected) {
LOG.info("Successfully connected to master: " + masterUrl + " Worker is now running.");
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index a95b185..63b8d15 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -50,7 +50,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
@Override
public void afterServiceRegistered(SpServiceDefinition serviceDef) {
- new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
+ new ConnectWorkerRegistrationService().registerWorker(serviceDef);
}
@Override
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index eeb9189..0a171da 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -61,6 +61,9 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
// Is used to store where the adapter is running to stop it
private String selectedEndpointUrl;
+ /**
+ * This is used to identify all the service within the service group the adapter can be invoked in
+ */
private String correspondingServiceGroup;
public AdapterDescription() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
index 5718aa3..81b6484 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
@TsModel
+@Deprecated
public class AdapterDescriptionList extends NamedStreamPipesEntity {
private List<AdapterDescription> list;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 52721d9..680879a 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -68,7 +68,7 @@ public class ResetManagement {
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
try {
- List<AdapterDescription> allAdapters = adapterMasterManagement.getAllAdapters();
+ List<AdapterDescription> allAdapters = adapterMasterManagement.getAllAdapterInstances();
allAdapters.forEach(adapterDescription -> {
try {
adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 0831269..68e8594 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.slf4j.Logger;
@@ -117,7 +116,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
try {
managementService.deleteAdapter(elementId);
- return ok(Notifications.success("Adapter deleted."));
+ return ok(Notifications.success("Adapter with id: " + elementId + " is deleted."));
} catch (AdapterException e) {
LOG.error("Error while deleting adapter with id " + elementId, e);
return ok(Notifications.error(e.getMessage()));
@@ -129,9 +128,8 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
@Produces(MediaType.APPLICATION_JSON)
public Response getAllAdapters() {
try {
- List<AdapterDescription> allAdapterDescription = managementService.getAllAdapters();
- AdapterDescriptionList result = new AdapterDescriptionList();
- result.setList(allAdapterDescription);
+ // TODO get all invoced adapters
+ List<AdapterDescription> result = managementService.getAllAdapterInstances();
return ok(result);
} catch (AdapterException e) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
index aa57494..4763666 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+@Deprecated
@Path("/v2/connect/master/adapters/template")
public class AdapterTemplateResource extends AbstractAdapterResource<AdapterTemplateMasterManagement> {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
index 9f9ab7e..f22648e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescriptionList;
@@ -36,6 +35,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.util.List;
import java.util.Optional;
@Path("/v2/connect/master/description")
@@ -59,6 +59,7 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
return ok(result);
}
+ @Deprecated
@GET
@JacksonSerialized
@Path("/protocols")
@@ -74,7 +75,7 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
@Path("/adapters")
@Produces(MediaType.APPLICATION_JSON)
public Response getAdapters() {
- AdapterDescriptionList result = managementService.getAdapters();
+ List<AdapterDescription> result = managementService.getAdapters();
return ok(result);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index cfb611a..8b6d4cd 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -75,13 +75,13 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
public Response addAdapter(@PathParam("streamId") String elementId,
SpDataSet dataSet) {
- String responseMessage = "Instance of data set " + dataSet.getUri() + " successfully started";
+ String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";
try {
managementService.addAdapter(elementId, dataSet);
} catch (AdapterException | NoServiceEndpointsAvailableException e) {
- LOG.error("Could not set data set instance: " + dataSet.getUri(), e);
- return ok(Notifications.error("Could not set data set instance: " + dataSet.getUri()));
+ LOG.error("Could not set data set instance: " + dataSet.getElementId(), e);
+ return ok(Notifications.error("Could not set data set instance: " + dataSet.getElementId()));
}
return ok(Notifications.success(responseMessage));
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
index 46e82e1..5730300 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
@@ -77,7 +77,7 @@ public class WelcomePageMaster extends AbstractAdapterResource<AdapterMasterMana
private void getAllRunningAdapters(HtmlCanvas canvas) throws IOException {
try {
- for (AdapterDescription ad : managementService.getAllAdapters()) {
+ for (AdapterDescription ad : managementService.getAllAdapterDescriptions()) {
canvas.li().write(ad.getElementId())._li();
canvas.ul().li().write("Kafka Topic: " + GroundingService.extractTopic(ad))._li()._ul();
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index a225b8c..27c7a94 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -51,7 +51,7 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
// Change this to List<AdapterDescription>
// How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
// LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
-// this.workerAdministrationManagement.register(connectWorkerContainer);
+ this.workerAdministrationManagement.register(availableAdapterDescription);
System.out.println(availableAdapterDescription);
return ok(Notifications.success("Worker Container successfully added"));
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
index add9fa6..b2f7352 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import java.util.List;
+@Deprecated
public interface IAdapterTemplateStorage {
List<AdapterDescription> getAllAdapterTemplates();
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 54f8f27..d4e6415 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -21,8 +21,6 @@ public interface INoSqlStorage {
IAdapterStorage getAdapterStorage();
- IAdapterTemplateStorage getAdapterTemplateStorage();
-
ICategoryStorage getCategoryStorageAPI();
ILabelStorage getLabelStorageAPI();
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 288ae0f..380cb23 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -26,12 +26,7 @@ public enum CouchDbStorageManager implements INoSqlStorage {
@Override
public IAdapterStorage getAdapterStorage() {
- return new AdapterStorageImpl();
- }
-
- @Override
- public IAdapterTemplateStorage getAdapterTemplateStorage() {
- return new AdapterTemplateStorageImpl();
+ return new AdapterDescriptionStorageImpl();
}
@Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
similarity index 87%
copy from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
copy to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
index 8dd1fec..327e562 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
@@ -18,26 +18,26 @@
package org.apache.streampipes.storage.couchdb.impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
import org.apache.streampipes.storage.couchdb.dao.DbCommand;
import org.apache.streampipes.storage.couchdb.dao.FindCommand;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
-public class AdapterStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
+public class AdapterDescriptionStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
- Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+ Logger LOG = LoggerFactory.getLogger(AdapterDescriptionStorageImpl.class);
private static final String SYSTEM_USER = "system";
- public AdapterStorageImpl() {
- super(Utils::getCouchDbAdapterClient, AdapterDescription.class);
+ public AdapterDescriptionStorageImpl() {
+ super(Utils::getCouchDbAdapterDescriptionClient, AdapterDescription.class);
}
@Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
similarity index 88%
rename from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
rename to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
index 8dd1fec..da84fb8 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
@@ -30,14 +30,14 @@ import org.apache.streampipes.storage.couchdb.utils.Utils;
import java.util.List;
import java.util.Optional;
-public class AdapterStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
+public class AdapterInstanceStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
- Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+ Logger LOG = LoggerFactory.getLogger(AdapterInstanceStorageImpl.class);
private static final String SYSTEM_USER = "system";
- public AdapterStorageImpl() {
- super(Utils::getCouchDbAdapterClient, AdapterDescription.class);
+ public AdapterInstanceStorageImpl() {
+ super(Utils::getCouchDbAdapterInstanceClient, AdapterDescription.class);
}
@Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
index 99d5373..b7fed7b 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
@@ -30,9 +30,10 @@ import org.apache.streampipes.storage.couchdb.utils.Utils;
import java.util.List;
import java.util.Optional;
+@Deprecated
public class AdapterTemplateStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterTemplateStorage {
- Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+ Logger LOG = LoggerFactory.getLogger(AdapterInstanceStorageImpl.class);
private static final String SYSTEM_USER = "system";
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index bbfd343..ed360bc 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -68,8 +68,12 @@ public class Utils {
return getCouchDbGsonClient("assetdashboard");
}
- public static CouchDbClient getCouchDbAdapterClient() {
- return getCouchDbAdapterClient("adapter");
+ public static CouchDbClient getCouchDbAdapterInstanceClient() {
+ return getCouchDbAdapterClient("adapterinstance");
+ }
+
+ public static CouchDbClient getCouchDbAdapterDescriptionClient() {
+ return getCouchDbAdapterClient("adapterdescription");
}
public static CouchDbClient getCouchDbPipelineClient() {
diff --git a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
index 2d0a49e..26c1b8d 100644
--- a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
@@ -91,10 +91,10 @@ export class DataMarketplaceComponent implements OnInit {
this.adapterDescriptions = [];
this.dataMarketplaceService
- .getGenericAndSpecificAdapterDescriptions()
+ .getAdapterDescriptions()
.subscribe((allAdapters) => {
- this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[0]);
- this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[1]);
+ this.adapterDescriptions = allAdapters;
+ // this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[1]);
this.adapterDescriptions
.sort((a, b) => a.name.localeCompare(b.name));
this.filteredAdapterDescriptions = this.adapterDescriptions;
@@ -105,16 +105,16 @@ export class DataMarketplaceComponent implements OnInit {
this.adapterLoadingError = true;
});
- this.dataMarketplaceService.getAdapterTemplates().subscribe(adapterTemplates => {
- adapterTemplates.forEach((adapterTemplate) => {
- (adapterTemplate as any).isTemplate = true;
- });
-
- this.adapterDescriptions = this.adapterDescriptions.concat(adapterTemplates);
- this.adapterDescriptions
- .sort((a, b) => a.name.localeCompare(b.name));
- this.filteredAdapterDescriptions = this.adapterDescriptions;
- });
+ // this.dataMarketplaceService.getAdapterTemplates().subscribe(adapterTemplates => {
+ // adapterTemplates.forEach((adapterTemplate) => {
+ // (adapterTemplate as any).isTemplate = true;
+ // });
+ //
+ // this.adapterDescriptions = this.adapterDescriptions.concat(adapterTemplates);
+ // this.adapterDescriptions
+ // .sort((a, b) => a.name.localeCompare(b.name));
+ // this.filteredAdapterDescriptions = this.adapterDescriptions;
+ // });
}
getAdaptersRunning(): void {
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index 9f1a9b7..50f73bd 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -23,20 +23,14 @@ import { AuthStatusService } from '../../services/auth-status.service';
import { ConnectService } from './connect.service';
import {
AdapterDescription,
- AdapterDescriptionList,
AdapterDescriptionUnion,
- EventSchema,
GenericAdapterSetDescription,
GenericAdapterStreamDescription,
Message,
- ProtocolDescription,
- ProtocolDescriptionList,
- SpDataSet,
- SpDataStream,
SpecificAdapterSetDescription,
SpecificAdapterStreamDescription
} from '../../core-model/gen/streampipes-model';
-import { Observable, zip } from 'rxjs';
+import { Observable } from 'rxjs';
import { PlatformServicesCommons } from '../../platform-services/apis/commons.service';
@Injectable()
@@ -54,21 +48,14 @@ export class DataMarketplaceService {
}
getAdapterDescriptions(): Observable<AdapterDescriptionUnion[]> {
- return this.requestAdapterDescriptions('/master/description/adapters').pipe(map(response => {
- return (response as any[])
- .map(resp => AdapterDescription.fromDataUnion(resp))
- .filter(ad => this.connectService.isSpecificDescription(ad));
- }));
+ return this.requestAdapterDescriptions('/master/description/adapters');
+
}
getAdapters(): Observable<AdapterDescriptionUnion[]> {
return this.requestAdapterDescriptions('/master/adapters');
}
- getAdapterTemplates(): Observable<AdapterDescriptionUnion[]> {
- return this.requestAdapterDescriptions('/master/adapters/template/all');
- }
-
requestAdapterDescriptions(path: string): Observable<AdapterDescriptionUnion[]> {
return this.http
.get(
@@ -76,8 +63,7 @@ export class DataMarketplaceService {
path
)
.pipe(map(response => {
- const adapterDescriptionList: AdapterDescriptionList = AdapterDescriptionList.fromData(response as AdapterDescriptionList);
- return adapterDescriptionList.list;
+ return (response as any[]).map(p => AdapterDescription.fromDataUnion(p));
}));
}
@@ -120,50 +106,53 @@ export class DataMarketplaceService {
);
}
- getProtocols(): Observable<AdapterDescriptionUnion[]> {
- return this.http
- .get(
- `${this.connectPath}/master/description/protocols`
- )
- .pipe(map(response => {
- const adapterDescriptions: AdapterDescriptionUnion[] = [];
- const protocols: ProtocolDescription[] = (ProtocolDescriptionList.fromData(response as ProtocolDescriptionList)).list;
-
- for (const protocol of protocols) {
- let newAdapterDescription: AdapterDescriptionUnion;
- if (protocol.sourceType === 'SET') {
- newAdapterDescription = new GenericAdapterSetDescription();
- newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription';
- newAdapterDescription.dataSet = new SpDataSet();
- newAdapterDescription.dataSet.eventSchema = new EventSchema();
- } else if (protocol.sourceType === 'STREAM') {
- newAdapterDescription = new GenericAdapterStreamDescription();
- newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription';
- newAdapterDescription.dataStream = new SpDataStream();
- newAdapterDescription.dataStream.eventSchema = new EventSchema();
- }
- newAdapterDescription.appId = protocol.appId;
- newAdapterDescription.name = protocol.name;
- newAdapterDescription.description = protocol.description;
- newAdapterDescription.iconUrl = protocol.iconUrl;
- newAdapterDescription.uri = newAdapterDescription.elementId;
- newAdapterDescription.category = protocol.category;
- newAdapterDescription.includedAssets = protocol.includedAssets;
- newAdapterDescription.includesAssets = protocol.includesAssets;
- newAdapterDescription.includedLocales = protocol.includedLocales;
- newAdapterDescription.includesLocales = protocol.includesLocales;
-
- if (
- newAdapterDescription instanceof GenericAdapterSetDescription ||
- newAdapterDescription instanceof GenericAdapterStreamDescription
- ) {
- newAdapterDescription.protocolDescription = protocol;
- }
- adapterDescriptions.push(newAdapterDescription);
- }
- return adapterDescriptions;
- }));
- }
+ /**
+ * @deprecated The method should not be used
+ */
+ // getProtocols(): Observable<AdapterDescriptionUnion[]> {
+ // return this.http
+ // .get(
+ // `${this.connectPath}/master/description/protocols`
+ // )
+ // .pipe(map(response => {
+ // const adapterDescriptions: AdapterDescriptionUnion[] = [];
+ // const protocols: ProtocolDescription[] = (ProtocolDescriptionList.fromData(response as ProtocolDescriptionList)).list;
+ //
+ // for (const protocol of protocols) {
+ // let newAdapterDescription: AdapterDescriptionUnion;
+ // if (protocol.sourceType === 'SET') {
+ // newAdapterDescription = new GenericAdapterSetDescription();
+ // newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription';
+ // newAdapterDescription.dataSet = new SpDataSet();
+ // newAdapterDescription.dataSet.eventSchema = new EventSchema();
+ // } else if (protocol.sourceType === 'STREAM') {
+ // newAdapterDescription = new GenericAdapterStreamDescription();
+ // newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription';
+ // newAdapterDescription.dataStream = new SpDataStream();
+ // newAdapterDescription.dataStream.eventSchema = new EventSchema();
+ // }
+ // newAdapterDescription.appId = protocol.appId;
+ // newAdapterDescription.name = protocol.name;
+ // newAdapterDescription.description = protocol.description;
+ // newAdapterDescription.iconUrl = protocol.iconUrl;
+ // newAdapterDescription.uri = newAdapterDescription.elementId;
+ // newAdapterDescription.category = protocol.category;
+ // newAdapterDescription.includedAssets = protocol.includedAssets;
+ // newAdapterDescription.includesAssets = protocol.includesAssets;
+ // newAdapterDescription.includedLocales = protocol.includedLocales;
+ // newAdapterDescription.includesLocales = protocol.includesLocales;
+ //
+ // if (
+ // newAdapterDescription instanceof GenericAdapterSetDescription ||
+ // newAdapterDescription instanceof GenericAdapterStreamDescription
+ // ) {
+ // newAdapterDescription.protocolDescription = protocol;
+ // }
+ // adapterDescriptions.push(newAdapterDescription);
+ // }
+ // return adapterDescriptions;
+ // }));
+ // }
// sortStaticProperties(sp: StaticProperty) {
// if (sp instanceof AlternativesStaticProperty) {
@@ -182,9 +171,9 @@ export class DataMarketplaceService {
// }
// }
- getGenericAndSpecificAdapterDescriptions(): Observable<[AdapterDescriptionUnion[], AdapterDescriptionUnion[]]> {
- return zip(this.getAdapterDescriptions(), this.getProtocols());
- }
+ // getGenericAndSpecificAdapterDescriptions(): Observable<[AdapterDescriptionUnion[], AdapterDescriptionUnion[]]> {
+ // return zip(this.getAdapterDescriptions(), this.getProtocols());
+ // }
cloneAdapterDescription(toClone: AdapterDescriptionUnion): AdapterDescriptionUnion {
let result: AdapterDescriptionUnion;