You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/07 10:27:47 UTC
[incubator-streampipes] 01/03: Remove ConnectWorkerDescription from
client
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit f326f391ab49795fd3d567d0ebd2c78a1013a1ad
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Oct 6 09:50:23 2021 +0200
Remove ConnectWorkerDescription from client
---
.../master/health/AdapterHealthCheck.java | 3 +--
.../master/management/AdapterMasterManagement.java | 5 ++++-
.../init/ConnectWorkerDescriptionProvider.java | 25 ++++++++++++++++------
.../worker/management/MasterRestClient.java | 7 +++---
.../standalone/init/StandaloneModelSubmitter.java | 1 +
.../impl/connect/WorkerAdministrationResource.java | 12 +++++++----
6 files changed, 37 insertions(+), 16 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 63e49c4..89bdd64 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -40,11 +40,10 @@ public class AdapterHealthCheck {
List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
// Group them by worker
-// AdapterDescription decryptedAdapterDescription =
-// new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
for (AdapterDescription adapterDescription : allRunningInstancesAdaperDescription) {
try {
+
adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
} catch (AdapterException e) {
e.printStackTrace();
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 98623be..77342c5 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -217,7 +217,10 @@ public class AdapterMasterManagement {
} else {
ad.setSelectedEndpointUrl(baseUrl);
adapterStorage.updateAdapter(ad);
- WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
+
+ AdapterDescription decryptedAdapterDescription =
+ new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
+ WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) decryptedAdapterDescription);
}
} catch (NoServiceEndpointsAvailableException e) {
e.printStackTrace();
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index 1ec7506..0b48a29 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -23,8 +23,11 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.locales.LabelGenerator;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.grounding.ProtocolSetDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolStreamDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +39,7 @@ public class ConnectWorkerDescriptionProvider {
private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerDescriptionProvider.class);
- public ConnectWorkerContainer getContainerDescription(String serviceGroup) {
+ public List<AdapterDescription> getContainerDescription(String serviceGroup) {
List<AdapterDescription> adapters = new ArrayList<>();
for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
@@ -44,13 +47,23 @@ public class ConnectWorkerDescriptionProvider {
adapters.add(desc);
}
- List<ProtocolDescription> protocols = new ArrayList<>();
for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
- ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel());
- protocols.add(desc);
+ ProtocolDescription protocolDescription = (ProtocolDescription) rewrite(p.declareModel());
+
+ if (protocolDescription instanceof ProtocolStreamDescription) {
+ GenericAdapterStreamDescription desc = new GenericAdapterStreamDescription();
+ desc.setAppId(protocolDescription.getAppId());
+ desc.setProtocolDescription(protocolDescription);
+ adapters.add(desc);
+ } else if (protocolDescription instanceof ProtocolSetDescription) {
+ GenericAdapterSetDescription desc = new GenericAdapterSetDescription();
+ desc.setAppId(protocolDescription.getAppId());
+ desc.setProtocolDescription(protocolDescription);
+ adapters.add(desc);
+ }
}
- return new ConnectWorkerContainer(serviceGroup, protocols, adapters);
+ return adapters;
}
private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity) {
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
index 360e758..5c7fbdd 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
@@ -20,23 +20,24 @@ package org.apache.streampipes.connect.container.worker.management;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
public class MasterRestClient {
private static final Logger LOG = LoggerFactory.getLogger(MasterRestClient.class);
- public static boolean register(String baseUrl, ConnectWorkerContainer connectWorkerContainer) {
+ public static boolean register(String baseUrl, List<AdapterDescription> allAvailableAdapters) {
String url = baseUrl + "/api/v2/connect/admin@streampipes.org/master/administration";
try {
- String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(connectWorkerContainer);
+ String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(allAvailableAdapters);
Request.Post(url)
.bodyString(adapterDescription, ContentType.APPLICATION_JSON)
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 1738887..14ccd16 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -34,6 +34,7 @@ import org.springframework.context.annotation.Import;
import java.net.UnknownHostException;
import java.util.List;
+@Deprecated
@Configuration
@EnableAutoConfiguration
@Import({ PipelineElementContainerResourceConfig.class })
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index bfd2979..a225b8c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.rest.impl.connect;
import org.apache.streampipes.connect.container.master.management.WorkerAdministrationManagement;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
@@ -31,6 +31,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.util.List;
@Path("v2/connect/{username}/master/administration")
public class WorkerAdministrationResource extends AbstractSharedRestInterface {
@@ -46,9 +47,12 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
@POST
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
- public Response addWorkerContainer(ConnectWorkerContainer connectWorkerContainer) {
- LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
- this.workerAdministrationManagement.register(connectWorkerContainer);
+ public Response addWorkerContainer(List<AdapterDescription> availableAdapterDescription) {
+ // Change this to List<AdapterDescription>
+ // How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
+// LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
+// this.workerAdministrationManagement.register(connectWorkerContainer);
+ System.out.println(availableAdapterDescription);
return ok(Notifications.success("Worker Container successfully added"));
}