You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/05 19:38:54 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-438] Start to move
restart of adapters to AdapterHealthCheck
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 676ff84a73b61553d77cb6fc9b6335d937d27660
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Oct 5 21:38:03 2021 +0200
[STREAMPIPES-438] Start to move restart of adapters to AdapterHealthCheck
---
.../master/health/AdapterHealthCheck.java | 65 +++++++++++++
.../master/management/AdapterMasterManagement.java | 59 +++++++++---
.../management/WorkerAdministrationManagement.java | 107 +++++++++++----------
.../master/util/AdapterEncryptionService.java | 7 +-
.../model/connect/adapter/AdapterDescription.java | 3 +-
.../connect/worker/ConnectWorkerContainer.java | 1 +
streampipes-pipeline-management/pom.xml | 2 +-
.../rest/impl/connect/AdapterResource.java | 9 +-
.../impl/connect/WorkerAdministrationResource.java | 2 +-
9 files changed, 175 insertions(+), 80 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
new file mode 100644
index 0000000..63e49c4
--- /dev/null
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.container.master.health;
+
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+
+import java.util.List;
+
+public class AdapterHealthCheck {
+
+ public AdapterHealthCheck() {
+ }
+
+ public void checkAndRestoreAdapters() {
+ AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
+ IAdapterStorage adapterStorage = new AdapterStorageImpl();
+
+
+ // Get all adapters
+ List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
+
+ // Group them by worker
+// AdapterDescription decryptedAdapterDescription =
+// new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
+
+ for (AdapterDescription adapterDescription : allRunningInstancesAdaperDescription) {
+ try {
+ adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+ } catch (AdapterException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ // Ask worker if they are up and running
+
+ // If not
+
+ // Find a worker to run them
+
+ // Invoke the adapters
+
+ }
+
+}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 993a88b..98623be 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.connect.adapter.GroundingService;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.storage.UserService;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.model.SpDataStream;
@@ -36,9 +37,12 @@ import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
import org.apache.streampipes.storage.management.StorageManager;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import java.util.UUID;
@@ -138,22 +142,27 @@ public class AdapterMasterManagement {
throw new AdapterException("Could not find adapter with id: " + id);
}
+ /**
+ * First the adapter is stopped removed, then the according data source is deleted
+ * @param elementId
+ * @throws AdapterException
+ */
public void deleteAdapter(String elementId) throws AdapterException {
// IF Stream adapter delete it
boolean isStreamAdapter = isStreamAdapter(elementId);
- AdapterDescription ad = adapterStorage.getAdapter(elementId);
if (isStreamAdapter) {
try {
- stopStreamAdapter(elementId, ad.getSelectedEndpointUrl());
+ stopStreamAdapter(elementId);
} catch (AdapterException e) {
LOG.info("Could not stop adapter: " + elementId);
LOG.info(e.toString());
}
}
- String username = ad.getUserName();
+ AdapterDescription ad = adapterStorage.getAdapter(elementId);
+ String username = ad.getUserName();
adapterStorage.deleteAdapter(elementId);
LOG.info("Successfully deleted adapter: " + elementId);
@@ -185,35 +194,55 @@ public class AdapterMasterManagement {
WorkerRestClient.stopSetAdapter(baseUrl, ad);
}
- public void stopStreamAdapter(String elementId, String baseUrl) throws AdapterException {
+ public void stopStreamAdapter(String elementId) throws AdapterException {
AdapterDescription ad = adapterStorage.getAdapter(elementId);
if (!isStreamAdapter(elementId)) {
throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
} else {
- WorkerRestClient.stopStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
+ WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), (AdapterStreamDescription) ad);
}
}
- public void startStreamAdapter(String adapterId, String baseUrl) throws AdapterException {
- AdapterDescription ad = adapterStorage.getAdapter(adapterId);
- if (!isStreamAdapter(adapterId)) {
- throw new AdapterException("Adapter " + adapterId + "is not a stream adapter.");
- } else {
- ad.setSelectedEndpointUrl(baseUrl);
- adapterStorage.updateAdapter(ad);
- WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
+ public void startStreamAdapter(String elementId) throws AdapterException {
+ // TODO ensure that adapter is not started twice
+
+ AdapterDescription ad = adapterStorage.getAdapter(elementId);
+ try {
+ String endpointUrl = findEndpointUrl(ad);
+ URI uri = new URI(endpointUrl);
+ String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+ if (!isStreamAdapter(elementId)) {
+ throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
+ } else {
+ ad.setSelectedEndpointUrl(baseUrl);
+ adapterStorage.updateAdapter(ad);
+ WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
+ }
+ } catch (NoServiceEndpointsAvailableException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
}
+
+
}
public boolean isStreamAdapter(String id) {
- AdapterDescription ad = adapterStorage.getAdapter(id);
+ AdapterDescription adapterDescription = adapterStorage.getAdapter(id);
+ return isStreamAdapter(adapterDescription);
+ }
- return ad instanceof AdapterStreamDescription;
+ public boolean isStreamAdapter(AdapterDescription adapterDescription) {
+ return adapterDescription instanceof AdapterStreamDescription;
}
private IAdapterStorage getAdapterStorage() {
return new AdapterStorageImpl();
}
+ private String findEndpointUrl(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
+ SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
+ return new ExtensionsServiceEndpointGenerator(adapterDescription.getAppId(), serviceUrlProvider).getEndpointResourceUrl();
+ }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index 097a0f4..2df8f95 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -18,17 +18,12 @@
package org.apache.streampipes.connect.container.master.management;
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.connect.container.master.health.AdapterHealthCheck;
import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
public class WorkerAdministrationManagement {
private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
@@ -36,61 +31,69 @@ public class WorkerAdministrationManagement {
private ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage;
private AdapterMasterManagement adapterMasterManagement;
+ private AdapterHealthCheck adapterHealthCheck;
+
public WorkerAdministrationManagement() {
this.connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
this.adapterMasterManagement = new AdapterMasterManagement();
+ this.adapterHealthCheck = new AdapterHealthCheck();
}
- // TODO refactor and test this function
public void register(ConnectWorkerContainer connectWorker) {
- // Check if already registered
-
- List<ConnectWorkerContainer> allConnectWorkerContainers =
- this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
+ // TODO how do I register the protocols and adapters of a worker?
- boolean alreadyRegistered = false;
+ this.adapterHealthCheck.checkAndRestoreAdapters();
- // Delete old description if it was registred before
- for (ConnectWorkerContainer c : allConnectWorkerContainers) {
- if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
- boolean adaptersChanged = false;
-
- for (AdapterDescription a : c.getAdapters()) {
- if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getElementId().equals(a.getElementId()))) {
- adaptersChanged = true;
- }
- }
-
- for (ProtocolDescription p : c.getProtocols()) {
- if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
- adaptersChanged = true;
- }
- }
-
- if (!adaptersChanged) {
- alreadyRegistered = true;
- } else {
- LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
- this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
- }
- }
- }
+ // Check if already registered
- // TODO I am not sure if this is correct
- // IF NOT REGISTERED
- // Store Connect Worker in DB
- if (!alreadyRegistered) {
- this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
- LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
- } else {
- try {
- this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
- } catch (AdapterException e) {
- LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
- } catch (NoServiceEndpointsAvailableException e) {
- LOG.error("Could not start adapter due to missing endpoint");
- }
- }
+// List<ConnectWorkerContainer> allConnectWorkerContainers =
+// this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
+//
+// boolean alreadyRegistered = false;
+//
+// // Delete old description if it was registred before
+// for (ConnectWorkerContainer c : allConnectWorkerContainers) {
+// if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
+// boolean adaptersChanged = false;
+//
+// for (AdapterDescription a : c.getAdapters()) {
+// if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getElementId().equals(a.getElementId()))) {
+// adaptersChanged = true;
+// }
+// }
+//
+// for (ProtocolDescription p : c.getProtocols()) {
+// if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
+// adaptersChanged = true;
+// }
+// }
+//
+// if (!adaptersChanged) {
+// alreadyRegistered = true;
+// } else {
+// LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
+// this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
+// }
+// }
+// }
+//
+//
+//
+// // TODO I am not sure if this is correct
+// // IF NOT REGISTERED
+// // Store Connect Worker in DB
+// if (!alreadyRegistered) {
+// this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
+// LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
+// } else {
+// try {
+// this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
+// } catch (AdapterException e) {
+// LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
+// } catch (NoServiceEndpointsAvailableException e) {
+// LOG.error("Could not start adapter due to missing endpoint");
+// }
+// }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
index 984cf58..1f61c01 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
@@ -17,7 +17,6 @@
*/
package org.apache.streampipes.connect.container.master.util;
-import org.apache.streampipes.manager.secret.SecretProvider;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
import org.apache.streampipes.model.staticproperty.StaticProperty;
@@ -56,10 +55,12 @@ public class AdapterEncryptionService {
}
private void encrypt(List<StaticProperty> staticProperties) {
- SecretProvider.getEncryptionService(ad.getUserName()).applyConfig(staticProperties);
+ // TODO uncomment
+// SecretProvider.getEncryptionService(ad.getUserName()).applyConfig(staticProperties);
}
private void decrypt(List<StaticProperty> staticProperties) {
- SecretProvider.getDecryptionService(ad.getUserName()).applyConfig(staticProperties);
+ // TODO uncomment
+// SecretProvider.getDecryptionService(ad.getUserName()).applyConfig(staticProperties);
}
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index e76574d..eeb9189 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -42,8 +42,6 @@ import java.util.List;
@TsModel
public abstract class AdapterDescription extends NamedStreamPipesEntity {
-// private String adapterId;
-
private String userName;
private EventGrounding eventGrounding;
@@ -60,6 +58,7 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
private long createdAt;
+ // Is used to store where the adapter is running to stop it
private String selectedEndpointUrl;
private String correspondingServiceGroup;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
index d6b2cff..b449c5b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.model.util.ElementIdGenerator;
import java.util.ArrayList;
import java.util.List;
+@Deprecated
public class ConnectWorkerContainer extends UnnamedStreamPipesEntity {
private @SerializedName("_rev") String rev;
diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml
index 1324eb9..89f6294 100644
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@ -42,7 +42,7 @@
<artifactId>streampipes-container</artifactId>
<version>0.69.0-SNAPSHOT</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-dataformat-cbor</artifactId>
<version>0.69.0-SNAPSHOT</version>
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index bbebb2e..0831269 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -87,9 +87,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
@Produces(MediaType.APPLICATION_JSON)
public Response stopAdapter(@PathParam("id") String adapterId) throws NoServiceEndpointsAvailableException {
try {
- AdapterDescription adapterDescription = getAdapterDescription(adapterId);
- String workerBaseUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
- managementService.stopStreamAdapter(adapterId, workerBaseUrl);
+ managementService.stopStreamAdapter(adapterId);
return ok(Notifications.success("Adapter started"));
} catch (AdapterException e) {
LOG.error("Could not stop adapter with id " +adapterId, e);
@@ -103,10 +101,9 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
@Produces(MediaType.APPLICATION_JSON)
public Response startAdapter(@PathParam("id") String adapterId) {
try {
- String workerUrl = workerUrlProvider.getWorkerBaseUrl(getAdapterDescription(adapterId).getAppId());
- managementService.startStreamAdapter(adapterId, workerUrl);
+ managementService.startStreamAdapter(adapterId);
return ok(Notifications.success("Adapter stopped"));
- } catch (AdapterException | NoServiceEndpointsAvailableException e) {
+ } catch (AdapterException e) {
LOG.error("Could not start adapter with id " +adapterId, e);
return ok(Notifications.error(e.getMessage()));
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index c30fddf..bfd2979 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -50,7 +50,7 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
this.workerAdministrationManagement.register(connectWorkerContainer);
- return ok(Notifications.success("Worker Container sucessfully added"));
+ return ok(Notifications.success("Worker Container successfully added"));
}
}