You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/07 10:27:47 UTC

[incubator-streampipes] 01/03: Remove ConnectWorkerDescription from client

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f326f391ab49795fd3d567d0ebd2c78a1013a1ad
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Oct 6 09:50:23 2021 +0200

    Remove ConnectWorkerDescription from client
---
 .../master/health/AdapterHealthCheck.java          |  3 +--
 .../master/management/AdapterMasterManagement.java |  5 ++++-
 .../init/ConnectWorkerDescriptionProvider.java     | 25 ++++++++++++++++------
 .../worker/management/MasterRestClient.java        |  7 +++---
 .../standalone/init/StandaloneModelSubmitter.java  |  1 +
 .../impl/connect/WorkerAdministrationResource.java | 12 +++++++----
 6 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 63e49c4..89bdd64 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -40,11 +40,10 @@ public class AdapterHealthCheck {
         List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
 
         // Group them by worker
-//        AdapterDescription decryptedAdapterDescription =
-//                new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
 
         for (AdapterDescription adapterDescription : allRunningInstancesAdaperDescription) {
             try {
+
                 adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
             } catch (AdapterException e) {
                 e.printStackTrace();
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 98623be..77342c5 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -217,7 +217,10 @@ public class AdapterMasterManagement {
       } else {
         ad.setSelectedEndpointUrl(baseUrl);
         adapterStorage.updateAdapter(ad);
-        WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
+
+        AdapterDescription decryptedAdapterDescription =
+                new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
+        WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) decryptedAdapterDescription);
       }
     } catch (NoServiceEndpointsAvailableException e) {
       e.printStackTrace();
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index 1ec7506..0b48a29 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -23,8 +23,11 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.locales.LabelGenerator;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.grounding.ProtocolSetDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolStreamDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +39,7 @@ public class ConnectWorkerDescriptionProvider {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerDescriptionProvider.class);
 
-  public ConnectWorkerContainer getContainerDescription(String serviceGroup) {
+  public List<AdapterDescription> getContainerDescription(String serviceGroup) {
 
     List<AdapterDescription> adapters = new ArrayList<>();
     for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
@@ -44,13 +47,23 @@ public class ConnectWorkerDescriptionProvider {
       adapters.add(desc);
     }
 
-    List<ProtocolDescription> protocols = new ArrayList<>();
     for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
-      ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel());
-      protocols.add(desc);
+      ProtocolDescription protocolDescription = (ProtocolDescription) rewrite(p.declareModel());
+
+      if (protocolDescription instanceof ProtocolStreamDescription) {
+        GenericAdapterStreamDescription desc = new GenericAdapterStreamDescription();
+        desc.setAppId(protocolDescription.getAppId());
+        desc.setProtocolDescription(protocolDescription);
+        adapters.add(desc);
+      } else if (protocolDescription instanceof ProtocolSetDescription) {
+        GenericAdapterSetDescription desc = new GenericAdapterSetDescription();
+        desc.setAppId(protocolDescription.getAppId());
+        desc.setProtocolDescription(protocolDescription);
+        adapters.add(desc);
+      }
     }
 
-    return new ConnectWorkerContainer(serviceGroup, protocols, adapters);
+    return adapters;
   }
 
   private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity) {
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
index 360e758..5c7fbdd 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
@@ -20,23 +20,24 @@ package org.apache.streampipes.connect.container.worker.management;
 
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 
 public class MasterRestClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(MasterRestClient.class);
 
-    public static boolean register(String baseUrl, ConnectWorkerContainer connectWorkerContainer) {
+    public static boolean register(String baseUrl, List<AdapterDescription> allAvailableAdapters) {
 
         String url = baseUrl + "/api/v2/connect/admin@streampipes.org/master/administration";
 
         try {
-            String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(connectWorkerContainer);
+            String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(allAvailableAdapters);
 
             Request.Post(url)
                     .bodyString(adapterDescription, ContentType.APPLICATION_JSON)
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 1738887..14ccd16 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -34,6 +34,7 @@ import org.springframework.context.annotation.Import;
 import java.net.UnknownHostException;
 import java.util.List;
 
+@Deprecated
 @Configuration
 @EnableAutoConfiguration
 @Import({ PipelineElementContainerResourceConfig.class })
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index bfd2979..a225b8c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.rest.impl.connect;
 
 import org.apache.streampipes.connect.container.master.management.WorkerAdministrationManagement;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
@@ -31,6 +31,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.util.List;
 
 @Path("v2/connect/{username}/master/administration")
 public class WorkerAdministrationResource extends AbstractSharedRestInterface {
@@ -46,9 +47,12 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
     @POST
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
-    public Response addWorkerContainer(ConnectWorkerContainer connectWorkerContainer) {
-        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
-        this.workerAdministrationManagement.register(connectWorkerContainer);
+    public Response addWorkerContainer(List<AdapterDescription> availableAdapterDescription) {
+        // Change this to List<AdapterDescription>
+        // How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
+//        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
+//        this.workerAdministrationManagement.register(connectWorkerContainer);
+        System.out.println(availableAdapterDescription);
 
         return ok(Notifications.success("Worker Container successfully added"));
     }