You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2024/03/27 09:17:14 UTC
(streampipes) 01/02: refactor: Improve extension installation process
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch improve-endpoint-resolution
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 425e7fb733a056a29c9cc67e6d3f68533cc152c6
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Mar 27 09:29:57 2024 +0100
refactor: Improve extension installation process
---
.../client/endpoint/ExtensionsServiceEndpoint.java | 61 --------
.../model/base/NamedStreamPipesEntity.java | 24 ++++
.../model/extensions/ExtensionAsset.java | 5 +-
.../model/extensions/ExtensionItemDescription.java | 38 ++---
.../ExtensionItemInstallationRequest.java | 22 ++-
.../svcdiscovery/SpServiceRegistration.java | 41 +++---
.../extensions/svcdiscovery/SpServiceTag.java | 2 +-
.../svcdiscovery/SpServiceTagPrefix.java | 19 +--
.../model}/util/ServiceDefinitionUtil.java | 36 +----
.../api/extensions/IExtensionsProvider.java | 12 +-
.../extensions/IExtensionsResourceUrlProvider.java | 16 ++-
.../endpoint/AvailableExtensionsProvider.java | 108 ++++++++++++++
.../manager/endpoint/EndpointFetcher.java | 40 ------
.../manager/endpoint/EndpointItemFetcher.java | 70 ----------
.../manager/endpoint/EndpointItemParser.java | 53 -------
.../manager/endpoint/ExtensionItemInstaller.java | 57 ++++++++
.../endpoint/ExtensionsResourceUrlProvider.java | 74 ++++++++++
.../manager/endpoint/HttpJsonParser.java | 54 -------
.../migration/AbstractMigrationManager.java | 7 +-
.../pipeline/ExtensionsServiceLogExecutor.java | 3 +-
.../streampipes/manager/operations/Operations.java | 7 -
.../manager/setup/ExtensionsInstallationTask.java | 32 +++--
.../manager/setup/InstallationConfiguration.java | 3 +-
.../setup/PipelineElementInstallationStep.java | 66 +++------
.../rest/core/base/impl/AbstractRestResource.java | 16 ---
...rt.java => ExtensionsInstallationResource.java} | 73 +++-------
.../admin/ExtensionsServiceEndpointResource.java | 155 +++------------------
.../org/apache/streampipes/sdk/utils/Assets.java | 10 +-
.../api/model/SpServicePathPrefix.java | 1 -
.../api/model/SpServiceUrlProvider.java | 39 ++----
.../svcdiscovery/SpServiceDiscoveryCore.java | 6 +-
.../extensions/ExtensionsModelSubmitter.java | 54 +------
.../service/extensions/ServiceTagProvider.java | 58 --------
.../StreamPipesExtensionsServiceBase.java | 87 ++++++++++--
.../api/IExtensionsServiceEndpointStorage.java | 32 -----
...ervice.ts => extension-installation.service.ts} | 30 ++--
.../src/lib/model/gen/streampipes-model.ts | 68 ++++++++-
.../platform-services/src/public-api.ts | 2 +-
ui/src/app/add/add.component.html | 14 +-
ui/src/app/add/add.component.ts | 40 +++---
ui/src/app/add/add.module.ts | 10 ++
.../endpoint-item/endpoint-item.component.html | 6 +-
.../endpoint-item/endpoint-item.component.ts | 73 +++++-----
.../endpoint-installation.component.ts | 38 +++--
ui/src/app/add/filter/order-by.pipe.ts | 7 +-
.../pipeline-element-installation-status.pipe.ts | 6 +-
.../app/add/filter/pipeline-element-name.pipe.ts | 6 +-
.../app/add/filter/pipeline-element-type.pipe.ts | 10 +-
ui/src/app/add/services/add.service.ts | 16 +--
49 files changed, 740 insertions(+), 967 deletions(-)
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
deleted file mode 100644
index b4d34e4013..0000000000
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.model.client.endpoint;
-
-import com.google.gson.annotations.SerializedName;
-
-public class ExtensionsServiceEndpoint {
-
- private @SerializedName("_id") String id;
- private @SerializedName("_rev") String rev;
-
- private String endpointUrl;
-
- public ExtensionsServiceEndpoint() {
-
- }
-
- public ExtensionsServiceEndpoint(String endpointUrl) {
- this.endpointUrl = endpointUrl;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getRev() {
- return rev;
- }
-
- public void setRev(String rev) {
- this.rev = rev;
- }
-
- public String getEndpointUrl() {
- return endpointUrl;
- }
-
- public void setEndpointUrl(String endpointUrl) {
- this.endpointUrl = endpointUrl;
- }
-}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
index 30981732f4..bd10263914 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/NamedStreamPipesEntity.java
@@ -19,7 +19,11 @@
package org.apache.streampipes.model.base;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.extensions.ExtensionAsset;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
import org.apache.streampipes.model.shared.annotation.TsModel;
+import org.apache.streampipes.model.util.ServiceDefinitionUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -208,6 +212,26 @@ public abstract class NamedStreamPipesEntity implements Serializable {
this.elementId = elementId;
}
+ public ExtensionItemDescription toExtensionDescription(boolean installed,
+ boolean editable,
+ boolean available) {
+ ExtensionItemDescription endpoint = new ExtensionItemDescription();
+ endpoint.setDescription(getDescription());
+ endpoint.setName(getName());
+ endpoint.setServiceTagPrefix(ServiceDefinitionUtil.getPrefix(this));
+ endpoint.setAvailable(available);
+ endpoint.setElementId(getElementId());
+ endpoint.setEditable(editable);
+ endpoint.setInstalled(installed);
+ endpoint.setIncludesIcon(isIncludesAssets() && getIncludedAssets().contains(ExtensionAsset.ICON));
+ endpoint.setIncludesDocs(isIncludesAssets() && getIncludedAssets().contains(ExtensionAsset.DOCUMENTATION));
+
+ if (!(this instanceof SpDataStream)) {
+ endpoint.setAppId(getAppId());
+ }
+ return endpoint;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionAsset.java
similarity index 92%
copy from streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionAsset.java
index f9205ff835..a596aec3e8 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionAsset.java
@@ -15,9 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.sdk.utils;
-public class Assets {
+package org.apache.streampipes.model.extensions;
+
+public class ExtensionAsset {
public static final String DOCUMENTATION = "documentation.md";
public static final String ICON = "icon.png";
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemDescription.java
similarity index 78%
rename from streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemDescription.java
index 739ea9dde8..68c0885011 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemDescription.java
@@ -16,20 +16,18 @@
*
*/
-package org.apache.streampipes.model.client.endpoint;
+package org.apache.streampipes.model.extensions;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.shared.annotation.TsModel;
-import java.util.List;
-
@TsModel
-public class ExtensionsServiceEndpointItem {
+public class ExtensionItemDescription {
private String name;
private String description;
private String elementId;
- private String uri;
- private String type;
+ private SpServiceTagPrefix serviceTagPrefix;
private String appId;
private boolean includesIcon;
@@ -39,9 +37,7 @@ public class ExtensionsServiceEndpointItem {
private boolean editable;
private boolean available;
- private List<ExtensionsServiceEndpointItem> streams;
-
- public ExtensionsServiceEndpointItem() {
+ public ExtensionItemDescription() {
}
@@ -61,22 +57,6 @@ public class ExtensionsServiceEndpointItem {
this.description = description;
}
- public String getUri() {
- return uri;
- }
-
- public void setUri(String uri) {
- this.uri = uri;
- }
-
- public List<ExtensionsServiceEndpointItem> getStreams() {
- return streams;
- }
-
- public void setStreams(List<ExtensionsServiceEndpointItem> streams) {
- this.streams = streams;
- }
-
public boolean isInstalled() {
return installed;
}
@@ -85,12 +65,12 @@ public class ExtensionsServiceEndpointItem {
this.installed = installed;
}
- public String getType() {
- return type;
+ public SpServiceTagPrefix getServiceTagPrefix() {
+ return serviceTagPrefix;
}
- public void setType(String type) {
- this.type = type;
+ public void setServiceTagPrefix(SpServiceTagPrefix serviceTagPrefix) {
+ this.serviceTagPrefix = serviceTagPrefix;
}
public String getAppId() {
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemInstallationRequest.java
similarity index 56%
rename from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemInstallationRequest.java
index b2b5091805..096ba3c12c 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/ExtensionItemInstallationRequest.java
@@ -15,15 +15,23 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api.model;
-import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
+package org.apache.streampipes.model.extensions;
+
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.shared.annotation.TsModel;
-public class DefaultSpServiceTags {
+@TsModel
+public record ExtensionItemInstallationRequest(String appId,
+ SpServiceTagPrefix serviceTagPrefix,
+ boolean publicElement) {
- public static final SpServiceTag CORE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "core");
- public static final SpServiceTag PE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "pe");
- public static final SpServiceTag CONNECT_WORKER = SpServiceTag
- .create(SpServiceTagPrefix.SYSTEM, "connect-worker");
+ public static ExtensionItemInstallationRequest fromDescription(ExtensionItemDescription itemDescription,
+ boolean publicElement) {
+ return new ExtensionItemInstallationRequest(
+ itemDescription.getAppId(),
+ itemDescription.getServiceTagPrefix(),
+ publicElement
+ );
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java
index 8edd9932fa..fc01712a8c 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java
@@ -17,11 +17,12 @@
*/
package org.apache.streampipes.model.extensions.svcdiscovery;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
import org.apache.streampipes.model.shared.annotation.TsModel;
import com.google.gson.annotations.SerializedName;
-import java.util.List;
+import java.util.Set;
@TsModel
public class SpServiceRegistration {
@@ -34,11 +35,13 @@ public class SpServiceRegistration {
private String scheme = "http";
private String host;
private int port;
- private List<SpServiceTag> tags;
+ private Set<SpServiceTag> tags;
private String healthCheckPath;
private long firstTimeSeenUnhealthy = 0;
private SpServiceStatus status = SpServiceStatus.REGISTERED;
+ private Set<ExtensionItemDescription> providedExtensions;
+
public SpServiceRegistration() {
}
@@ -47,8 +50,9 @@ public class SpServiceRegistration {
String svcId,
String host,
int port,
- List<SpServiceTag> tags,
- String healthCheckPath) {
+ Set<SpServiceTag> tags,
+ String healthCheckPath,
+ Set<ExtensionItemDescription> providedExtensions) {
this.svcType = svcType;
this.svcGroup = svcGroup;
this.svcId = svcId;
@@ -56,6 +60,7 @@ public class SpServiceRegistration {
this.port = port;
this.tags = tags;
this.healthCheckPath = healthCheckPath;
+ this.providedExtensions = providedExtensions;
}
public static SpServiceRegistration from(String svcType,
@@ -63,18 +68,10 @@ public class SpServiceRegistration {
String svcId,
String host,
Integer port,
- List<SpServiceTag> tags) {
- return new SpServiceRegistration(svcType, svcGroup, svcId, host, port, tags, "");
- }
-
- public static SpServiceRegistration from(String svcType,
- String svcGroup,
- String svcId,
- String host,
- Integer port,
- List<SpServiceTag> tags,
- String healthCheckPath) {
- return new SpServiceRegistration(svcType, svcGroup, svcId, host, port, tags, healthCheckPath);
+ Set<SpServiceTag> tags,
+ String healthCheckPath,
+ Set<ExtensionItemDescription> providedExtensions) {
+ return new SpServiceRegistration(svcType, svcGroup, svcId, host, port, tags, healthCheckPath, providedExtensions);
}
public String getSvcGroup() {
@@ -109,11 +106,11 @@ public class SpServiceRegistration {
this.port = port;
}
- public List<SpServiceTag> getTags() {
+ public Set<SpServiceTag> getTags() {
return tags;
}
- public void setTags(List<SpServiceTag> tags) {
+ public void setTags(Set<SpServiceTag> tags) {
this.tags = tags;
}
@@ -168,4 +165,12 @@ public class SpServiceRegistration {
public void setStatus(SpServiceStatus status) {
this.status = status;
}
+
+ public Set<ExtensionItemDescription> getProvidedExtensions() {
+ return providedExtensions;
+ }
+
+ public void setProvidedExtensions(Set<ExtensionItemDescription> providedExtensions) {
+ this.providedExtensions = providedExtensions;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java
index 15feea7113..71b7fe428e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java
@@ -38,7 +38,7 @@ public class SpServiceTag {
}
public String asString() {
- return prefix.asString() + COLON + value;
+ return prefix + COLON + value;
}
public SpServiceTagPrefix getPrefix() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java
index 346e0b2224..f412bdbe6f 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java
@@ -18,20 +18,13 @@
package org.apache.streampipes.model.extensions.svcdiscovery;
public enum SpServiceTagPrefix {
- SYSTEM("sys"),
- SP_GROUP("spgroup"),
- ADAPTER("adapter"),
- DATA_STREAM("dstream"),
- DATA_PROCESSOR("dprocessor"),
- DATA_SINK("dsink");
-
- private String prefix;
-
- SpServiceTagPrefix(String prefix) {
- this.prefix = prefix;
- }
+ SP_GROUP,
+ ADAPTER,
+ DATA_STREAM,
+ DATA_PROCESSOR,
+ DATA_SINK;
public String asString() {
- return this.prefix;
+ return this.name();
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/ServiceDefinitionUtil.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/ServiceDefinitionUtil.java
similarity index 57%
rename from streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/ServiceDefinitionUtil.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/util/ServiceDefinitionUtil.java
index e89858528c..9918e628bf 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/ServiceDefinitionUtil.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/ServiceDefinitionUtil.java
@@ -15,54 +15,28 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.extensions.management.util;
+package org.apache.streampipes.model.util;
-import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
-import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
public class ServiceDefinitionUtil {
- public static List<SpServiceTag> extractAppIds(Collection<IStreamPipesPipelineElement<?>> declarers) {
- return declarers
- .stream()
- .map(d -> {
- var model = d.declareConfig().getDescription();
- return SpServiceTag.create(getPrefix(model), model.getAppId());
- })
- .collect(Collectors.toList());
- }
-
- private static SpServiceTagPrefix getPrefix(NamedStreamPipesEntity entity) {
+ public static SpServiceTagPrefix getPrefix(NamedStreamPipesEntity entity) {
if (entity instanceof SpDataStream) {
return SpServiceTagPrefix.DATA_STREAM;
} else if (entity instanceof DataProcessorDescription) {
return SpServiceTagPrefix.DATA_PROCESSOR;
} else if (entity instanceof DataSinkDescription) {
return SpServiceTagPrefix.DATA_SINK;
+ } else if (entity instanceof AdapterDescription) {
+ return SpServiceTagPrefix.ADAPTER;
} else {
throw new RuntimeException("Could not find service tag for entity " + entity.getClass().getSimpleName());
}
}
-
- public static List<SpServiceTag> extractAppIdsFromAdapters(Collection<StreamPipesAdapter> adapters) {
- return adapters
- .stream()
- .map(d -> SpServiceTag.create(
- SpServiceTagPrefix.ADAPTER,
- d.declareConfig().getAdapterDescription().getAppId()
- )
- )
- .collect(Collectors.toList());
- }
-
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsProvider.java
similarity index 76%
copy from streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsProvider.java
index f9205ff835..b79f10b0bb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsProvider.java
@@ -15,10 +15,14 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.sdk.utils;
-public class Assets {
+package org.apache.streampipes.manager.api.extensions;
- public static final String DOCUMENTATION = "documentation.md";
- public static final String ICON = "icon.png";
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+
+import java.util.List;
+
+public interface IExtensionsProvider {
+
+ List<ExtensionItemDescription> getExtensions();
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsResourceUrlProvider.java
similarity index 66%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsResourceUrlProvider.java
index 573563a325..e724ef5427 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsResourceUrlProvider.java
@@ -15,14 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.svcdiscovery.api.model;
-public class SpServicePathPrefix {
+package org.apache.streampipes.manager.api.extensions;
- public static final String DATA_PROCESSOR = "sepa";
- public static final String DATA_SINK = "sec";
- public static final String DATA_STREAM = "stream";
- public static final String ADAPTER = "api/v1/worker/adapters";
- public static final String PROTOCOL = "api/v1/worker/protocols";
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
+
+public interface IExtensionsResourceUrlProvider {
+
+ String getDescriptionUrl(ExtensionItemInstallationRequest installationReq);
+
+ String getIconUrl(ExtensionItemDescription endpointItem);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/AvailableExtensionsProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/AvailableExtensionsProvider.java
new file mode 100644
index 0000000000..650d6eec94
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/AvailableExtensionsProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.endpoint;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.storage.api.INoSqlStorage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class AvailableExtensionsProvider {
+
+ private final INoSqlStorage storage;
+
+ public AvailableExtensionsProvider(INoSqlStorage storage) {
+ this.storage = storage;
+ }
+
+ public List<ExtensionItemDescription> getExtensions() {
+ var installedExtensions = getAllInstalledExtensions();
+ var availableExtensions = getAvailableExtensions().stream()
+ .collect(Collectors.toMap(
+ ExtensionItemDescription::getElementId,
+ Function.identity(),
+ (existing, replacement) -> existing)
+ );
+
+ var processedInstalledExtensions = installedExtensions
+ .stream()
+ .peek(extension -> {
+ if (!availableExtensions.containsKey(extension.getElementId())) {
+ extension.setAvailable(false);
+ }
+ })
+ .toList();
+
+ var allExtensions = new ArrayList<>(processedInstalledExtensions);
+ availableExtensions.values()
+ .stream()
+ .filter(extension -> processedInstalledExtensions
+ .stream()
+ .noneMatch(e -> extension.getElementId().equals(e.getElementId())))
+ .forEach(allExtensions::add);
+
+ return allExtensions;
+ }
+
+ private List<ExtensionItemDescription> getAvailableExtensions() {
+ return storage.getExtensionsServiceStorage().getAll()
+ .stream()
+ .filter(service -> service.getStatus() == SpServiceStatus.HEALTHY)
+ .flatMap(service -> service.getProvidedExtensions().stream())
+ .toList();
+ }
+
+ private List<ExtensionItemDescription> getAllInstalledExtensions() {
+ List<NamedStreamPipesEntity> elements = new ArrayList<>();
+ elements.addAll(getAllAdapters());
+ elements.addAll(getAllDataStreams());
+ elements.addAll(getAllDataProcessors());
+ elements.addAll(getAllDataSinks());
+ return elements.stream().map(e -> e.toExtensionDescription(true, !e.isInternallyManaged(), true)).toList();
+ }
+
+ private List<AdapterDescription> getAllAdapters() {
+ return storage.getAdapterDescriptionStorage().getAllAdapters();
+ }
+
+ private List<SpDataStream> getAllDataStreams() {
+ return storage.getDataStreamStorage().getAll()
+ .stream()
+ .filter(stream -> !stream.isInternallyManaged())
+ .toList();
+ }
+
+ private List<DataProcessorDescription> getAllDataProcessors() {
+ return storage.getDataProcessorStorage().getAll();
+ }
+
+ private List<DataSinkDescription> getAllDataSinks() {
+ return storage.getDataSinkStorage().getAll();
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
deleted file mode 100644
index abe3ff2b75..0000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.endpoint;
-
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public class EndpointFetcher {
-
- public List<ExtensionsServiceEndpoint> getEndpoints() {
- List<String> endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePipelineElementEndpoints();
- List<ExtensionsServiceEndpoint> serviceExtensionsServiceEndpoints = new LinkedList<>();
-
- for (String endpoint : endpoints) {
- ExtensionsServiceEndpoint extensionsServiceEndpoint =
- new ExtensionsServiceEndpoint(endpoint);
- serviceExtensionsServiceEndpoints.add(extensionsServiceEndpoint);
- }
-
- return serviceExtensionsServiceEndpoints;
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
deleted file mode 100644
index f7a4974d03..0000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.endpoint;
-
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class EndpointItemFetcher {
- Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class);
-
- private final List<ExtensionsServiceEndpoint> extensionsServiceEndpoints;
-
- public EndpointItemFetcher(List<ExtensionsServiceEndpoint> extensionsServiceEndpoints) {
- this.extensionsServiceEndpoints = extensionsServiceEndpoints;
- }
-
- public List<ExtensionsServiceEndpointItem> getItems() {
- List<ExtensionsServiceEndpointItem> endpointItems = new ArrayList<>();
- extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
- return endpointItems;
- }
-
- private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
- try {
- String result = ExtensionServiceExecutions
- .extServiceGetRequest(e.getEndpointUrl())
- .execute()
- .returnContent()
- .asString();
-
- return JacksonSerializer.getObjectMapper()
- .readValue(result, new TypeReference<>() {
- });
- } catch (IOException e1) {
- logger.warn(
- "Processing Element Descriptions could not be fetched from endpoint {}: {} ",
- e.getEndpointUrl(),
- e1.getMessage()
- );
- return Collections.emptyList();
- }
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java
deleted file mode 100644
index 9ab4bf48e6..0000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemParser.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.endpoint;
-
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.NotificationType;
-import org.apache.streampipes.model.message.Notifications;
-
-import java.io.IOException;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
-
-public class EndpointItemParser {
-
- public Message parseAndAddEndpointItem(String url,
- String principalSid,
- boolean publicElement) {
- try {
- url = URLDecoder.decode(url, StandardCharsets.UTF_8);
- String payload = parseURIContent(url);
- return Operations.verifyAndAddElement(payload, principalSid, publicElement);
- } catch (Exception e) {
- e.printStackTrace();
- return Notifications.error(NotificationType.PARSE_ERROR, e.getMessage());
-
- }
- }
-
- private String parseURIContent(String url) throws IOException {
- return ExtensionServiceExecutions
- .extServiceGetRequest(url)
- .execute()
- .returnContent()
- .asString();
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionItemInstaller.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionItemInstaller.java
new file mode 100644
index 0000000000..5a9cd88329
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionItemInstaller.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.endpoint;
+
+import org.apache.streampipes.commons.exceptions.SepaParseException;
+import org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider;
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
+import org.apache.streampipes.model.message.Message;
+
+import java.io.IOException;
+
+public class ExtensionItemInstaller {
+
+ private final IExtensionsResourceUrlProvider urlProvider;
+
+ public ExtensionItemInstaller(IExtensionsResourceUrlProvider urlProvider) {
+ this.urlProvider = urlProvider;
+ }
+
+ public Message installExtension(ExtensionItemInstallationRequest req,
+ String principalSid) throws IOException, SepaParseException {
+ var descriptionUrl = getDescriptionUrl(req);
+ var description = fetchDescription(descriptionUrl);
+ return Operations.verifyAndAddElement(description, principalSid, req.publicElement());
+ }
+
+ public Message updateExtension(ExtensionItemInstallationRequest req) throws IOException, SepaParseException {
+ var descriptionUrl = getDescriptionUrl(req);
+ var description = fetchDescription(descriptionUrl);
+ return Operations.verifyAndUpdateElement(description);
+ }
+
+ private String getDescriptionUrl(ExtensionItemInstallationRequest req) {
+ return urlProvider.getDescriptionUrl(req);
+ }
+
+ private String fetchDescription(String descriptionUrl) throws IOException {
+ return ExtensionServiceExecutions.extServiceGetRequest(descriptionUrl).execute().returnContent().asString();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionsResourceUrlProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionsResourceUrlProvider.java
new file mode 100644
index 0000000000..60092b5094
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/ExtensionsResourceUrlProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.endpoint;
+
+import org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.util.List;
+
+public class ExtensionsResourceUrlProvider implements IExtensionsResourceUrlProvider {
+
+ private final ISpServiceDiscovery serviceDiscovery;
+
+ public ExtensionsResourceUrlProvider(ISpServiceDiscovery serviceDiscovery) {
+ this.serviceDiscovery = serviceDiscovery;
+ }
+
+ @Override
+ public String getDescriptionUrl(ExtensionItemInstallationRequest installationReq) {
+ var baseUrl = getMatchingServiceBaseUrl(installationReq.serviceTagPrefix(), installationReq.appId());
+ var urlProvider = getServiceUrlProvider(installationReq.serviceTagPrefix());
+ return urlProvider.getInvocationUrl(baseUrl, installationReq.appId());
+ }
+
+ @Override
+ public String getIconUrl(ExtensionItemDescription endpointItem) {
+ var baseUrl = getMatchingServiceBaseUrl(endpointItem.getServiceTagPrefix(), endpointItem.getAppId());
+ var urlProvider = getServiceUrlProvider(endpointItem.getServiceTagPrefix());
+ return urlProvider.getIconUrl(baseUrl, endpointItem.getAppId());
+ }
+
+ private SpServiceUrlProvider getServiceUrlProvider(SpServiceTagPrefix serviceTagPrefix) {
+ return SpServiceUrlProvider.valueOf(serviceTagPrefix.name());
+ }
+
+ private String getMatchingServiceBaseUrl(SpServiceTagPrefix serviceTagPrefix,
+ String appId) throws RuntimeException {
+ var serviceTag = getServiceTag(serviceTagPrefix, appId);
+ var services = serviceDiscovery
+ .getServiceEndpoints(DefaultSpServiceTypes.EXT, true, List.of(serviceTag));
+ if (!services.isEmpty()) {
+ return services.get(0);
+ } else {
+ throw new RuntimeException("Could not find matching service");
+ }
+ }
+
+ private String getServiceTag(SpServiceTagPrefix tagPrefix,
+ String appId) {
+ return SpServiceTag.create(tagPrefix, appId).asString();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/HttpJsonParser.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/HttpJsonParser.java
deleted file mode 100644
index 11065e0d09..0000000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/HttpJsonParser.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.endpoint;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-import java.io.IOException;
-import java.net.URI;
-
-
-@SuppressWarnings("deprecation")
-public class HttpJsonParser {
-
- public static String getContentFromUrl(URI uri) throws ClientProtocolException, IOException {
- return getContentFromUrl(uri, null);
- }
-
- public static String getContentFromUrl(URI uri, String header) throws ClientProtocolException, IOException {
- HttpGet request = new HttpGet(uri);
- if (header != null) {
- request.addHeader("Accept", header);
- }
-
- @SuppressWarnings("resource")
- HttpClient client = new DefaultHttpClient();
- HttpResponse response = client.execute(request);
-
- String pageContent = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
-
- return pageContent;
-
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index ce5ede3511..5493bbf69d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.manager.migration;
import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.endpoint.HttpJsonParser;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -137,7 +135,10 @@ public abstract class AbstractMigrationManager {
protected void performUpdate(String requestUrl) {
try {
- var entityPayload = HttpJsonParser.getContentFromUrl(URI.create(requestUrl));
+ var entityPayload = ExtensionServiceExecutions.extServiceGetRequest(requestUrl)
+ .execute()
+ .returnContent()
+ .asString();
var updateResult = Operations.verifyAndUpdateElement(entityPayload);
if (!updateResult.isSuccess()) {
LOG.error(
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
index c88259d5ff..8d76e31252 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
@@ -30,7 +30,6 @@ import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -106,7 +105,7 @@ public class ExtensionsServiceLogExecutor implements Runnable {
return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(
DefaultSpServiceTypes.EXT,
true,
- List.of(DefaultSpServiceTags.PE.asString(), DefaultSpServiceTags.CONNECT_WORKER.asString())
+ List.of()
);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 89f5f44847..83a26c7500 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.manager.operations;
import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.recommender.ElementRecommender;
@@ -34,8 +33,6 @@ import org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler
import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.Pipeline;
@@ -111,10 +108,6 @@ public class Operations {
return new PipelineExecutor(pipeline, forceStop).stopPipeline();
}
- public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
- return new EndpointItemFetcher(endpoints).getItems();
- }
-
public static SpDataStream updateActualTopic(SpDataStream stream) {
return new WildcardTopicGenerator(stream).computeActualTopic();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
index 026b6eef16..f246d55acd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
@@ -18,9 +18,10 @@
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.manager.endpoint.EndpointFetcher;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.manager.endpoint.AvailableExtensionsProvider;
import org.apache.streampipes.model.client.setup.InitialSettings;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+import org.apache.streampipes.storage.api.INoSqlStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +40,13 @@ public class ExtensionsInstallationTask implements Runnable {
private final InitialSettings settings;
private final BackgroundTaskNotifier callback;
+ private final INoSqlStorage storage;
public ExtensionsInstallationTask(InitialSettings settings,
+ INoSqlStorage storage,
BackgroundTaskNotifier callback) {
this.settings = settings;
+ this.storage = storage;
this.callback = callback;
}
@@ -50,26 +54,30 @@ public class ExtensionsInstallationTask implements Runnable {
public void run() {
List<InstallationStep> steps = new ArrayList<>();
if (settings.getInstallPipelineElements()) {
- List<ExtensionsServiceEndpoint> endpoints;
+ List<ExtensionItemDescription> availableExtensions;
int numberOfAttempts = 0;
do {
- endpoints = new EndpointFetcher().getEndpoints();
+
+ availableExtensions = new AvailableExtensionsProvider(storage).getExtensions();
numberOfAttempts++;
- if (endpoints.isEmpty()) {
- LOG.info("Found 0 endpoints - waiting {} seconds to make sure all endpoints have properly started",
+ if (availableExtensions.isEmpty()) {
+ LOG.info("Found 0 extensions - waiting {} seconds to make sure all extension services have properly started",
SLEEP_TIME_SECONDS);
try {
TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.warn("Interrupted. ", e);
}
}
- } while (endpoints.isEmpty() && numberOfAttempts < MAX_RETRIES);
- LOG.info("Found {} endpoints from which we will install extensions.", endpoints.size());
+ } while (availableExtensions.isEmpty() && numberOfAttempts < MAX_RETRIES);
+ LOG.info("Found {} extensions which we will install", availableExtensions.size());
LOG.info(
- "Further available extensions can always be installed by navigating to the 'Install pipeline elements' view");
- for (ExtensionsServiceEndpoint endpoint : endpoints) {
- steps.add(new PipelineElementInstallationStep(endpoint, settings.getInitialAdminUserSid()));
+ "Further available extensions can be installed by navigating to the 'Install pipeline elements' view");
+ for (ExtensionItemDescription extensionItem : availableExtensions) {
+ steps.add(new PipelineElementInstallationStep(
+ extensionItem,
+ settings.getInitialAdminUserSid())
+ );
}
AtomicInteger errorCount = new AtomicInteger(0);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
index 8a067836ac..294fd7073b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.manager.setup;
import org.apache.streampipes.model.client.setup.InitialSettings;
+import org.apache.streampipes.storage.management.StorageDispatcher;
import java.util.ArrayList;
import java.util.List;
@@ -42,6 +43,6 @@ public class InstallationConfiguration {
public static List<Runnable> getBackgroundInstallationSteps(InitialSettings settings,
BackgroundTaskNotifier callback) {
- return List.of(new ExtensionsInstallationTask(settings, callback));
+ return List.of(new ExtensionsInstallationTask(settings, StorageDispatcher.INSTANCE.getNoSqlStore(), callback));
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
index 568f2902f5..f1aa8b47ef 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
@@ -17,71 +17,41 @@
*/
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.manager.endpoint.EndpointItemParser;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
-import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.commons.exceptions.SepaParseException;
+import org.apache.streampipes.manager.endpoint.ExtensionItemInstaller;
+import org.apache.streampipes.manager.endpoint.ExtensionsResourceUrlProvider;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
+import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
public class PipelineElementInstallationStep extends InstallationStep {
- private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInstallationStep.class);
- private static final int MAX_RETRIES = 8;
-
- private final ExtensionsServiceEndpoint endpoint;
+ private final ExtensionItemDescription extensionItem;
private final String principalSid;
- private int retries = 0;
- public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint,
+ public PipelineElementInstallationStep(ExtensionItemDescription extensionItem,
String principalSid) {
- this.endpoint = endpoint;
+ this.extensionItem = extensionItem;
this.principalSid = principalSid;
}
@Override
public void install() {
- List<Message> statusMessages = new ArrayList<>();
- List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
- if (items.isEmpty() && retries <= MAX_RETRIES) {
- retries++;
- LOG.info(
- "Endpoint available but no extensions yet found, so we will retry to fetch pipeline elements ({}/{})",
- retries,
- MAX_RETRIES
- );
- try {
- TimeUnit.SECONDS.sleep(2);
- install();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- LOG.info("Found {} endpoint items for endpoint {}", items.size(), endpoint.getEndpointUrl());
- for (ExtensionsServiceEndpointItem item : items) {
- statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(),
- principalSid, true));
- }
-
- if (statusMessages.stream().allMatch(Message::isSuccess)) {
- logSuccess(getTitle());
- } else {
- logFailure(getTitle());
- }
+ var installationReq = ExtensionItemInstallationRequest.fromDescription(extensionItem, true);
+ var resourceUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
+ try {
+ new ExtensionItemInstaller(resourceUrlProvider).installExtension(installationReq, principalSid);
+ logSuccess(getTitle());
+ } catch (SepaParseException | IOException e) {
+ logFailure(getTitle());
}
-
}
@Override
public String getTitle() {
- return "Installing pipeline elements from " + endpoint.getEndpointUrl();
+ return "Installing extension " + extensionItem.getName();
}
}
diff --git a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
index 989663da65..3900781613 100644
--- a/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
+++ b/streampipes-rest-core-base/src/main/java/org/apache/streampipes/rest/core/base/impl/AbstractRestResource.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.rest.core.base.impl;
-import org.apache.streampipes.manager.endpoint.HttpJsonParser;
import org.apache.streampipes.model.message.ErrorMessage;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notification;
@@ -36,12 +35,8 @@ import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.api.IUserStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.http.client.ClientProtocolException;
import org.springframework.http.ResponseEntity;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLDecoder;
public class AbstractRestResource extends AbstractSharedRestInterface {
@@ -86,17 +81,6 @@ public class AbstractRestResource extends AbstractSharedRestInterface {
return getNoSqlStorage().getFileMetadataStorage();
}
- protected String parseURIContent(String payload) throws URISyntaxException,
- ClientProtocolException, IOException {
- return parseURIContent(payload, null);
- }
-
- protected String parseURIContent(String payload, String mediaType) throws URISyntaxException,
- ClientProtocolException, IOException {
- URI uri = new URI(payload);
- return HttpJsonParser.getContentFromUrl(uri, mediaType);
- }
-
protected ResponseEntity<Message> constructSuccessMessage(Notification... notifications) {
return statusMessage(new SuccessMessage(notifications));
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/PipelineElementImport.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
similarity index 57%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/PipelineElementImport.java
rename to streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
index d6c689818f..6701e29f4b 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/PipelineElementImport.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
@@ -18,27 +18,22 @@
package org.apache.streampipes.rest.impl.admin;
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
-import org.apache.streampipes.manager.endpoint.EndpointItemParser;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.manager.endpoint.ExtensionItemInstaller;
+import org.apache.streampipes.manager.endpoint.ExtensionsResourceUrlProvider;
+import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notification;
import org.apache.streampipes.model.message.NotificationType;
-import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.security.AuthConstants;
-import org.apache.streampipes.rest.shared.exception.SpMessageException;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
-import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@@ -48,37 +43,34 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
-import java.net.URISyntaxException;
@RestController
-@RequestMapping("/api/v2/element")
+@RequestMapping("/api/v2/extension-installation")
@PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
-public class PipelineElementImport extends AbstractAuthGuardedRestResource {
+public class ExtensionsInstallationResource extends AbstractAuthGuardedRestResource {
@PostMapping(
- consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<Message> addElement(@RequestBody MultiValueMap<String, String> formDataMap) {
- if (formDataMap.containsKey("uri") && formDataMap.containsKey("publicElement")) {
- return ok(verifyAndAddElement(
- formDataMap.get("uri").get(0),
- getAuthenticatedUserSid(),
- Boolean.parseBoolean(formDataMap.get("publicElement").get(0))
- ));
- } else {
- throw new SpMessageException(HttpStatus.BAD_REQUEST, Notifications.error("Invalid input"));
+ public ResponseEntity<Message> addElement(@RequestBody ExtensionItemInstallationRequest installationReq) {
+ var descriptionUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
+ try {
+ return ok(new ExtensionItemInstaller(descriptionUrlProvider)
+ .installExtension(installationReq, getAuthenticatedUserSid()));
+ } catch (IOException | SepaParseException e) {
+ return constructErrorMessage(new Notification(NotificationType.PARSE_ERROR, e.getMessage()));
}
}
- @PutMapping(path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<Message> updateElement(@PathVariable("id") String elementId) {
+ @PutMapping(
+ consumes = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<Message> updateElement(@RequestBody ExtensionItemInstallationRequest installationReq) {
+ var descriptionUrlProvider = new ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
try {
- NamedStreamPipesEntity entity = find(elementId);
- String url = new ExtensionsServiceEndpointGenerator(entity).getEndpointResourceUrl();
- String payload = parseURIContent(url);
- return ok(Operations.verifyAndUpdateElement(payload));
- } catch (URISyntaxException | IOException | SepaParseException | NoServiceEndpointsAvailableException e) {
- e.printStackTrace();
+ return ok(new ExtensionItemInstaller(descriptionUrlProvider)
+ .updateExtension(installationReq));
+ } catch (IOException | SepaParseException e) {
return constructErrorMessage(new Notification(NotificationType.PARSE_ERROR, e.getMessage()));
}
}
@@ -112,25 +104,4 @@ public class PipelineElementImport extends AbstractAuthGuardedRestResource {
}
return constructSuccessMessage(NotificationType.STORAGE_SUCCESS.uiNotification());
}
-
- private Message verifyAndAddElement(String uri,
- String principalSid,
- boolean publicElement) {
- return new EndpointItemParser().parseAndAddEndpointItem(uri, principalSid, publicElement);
- }
-
- private NamedStreamPipesEntity find(String elementId) {
- var extensionStorage = getPipelineElementStorage();
- if (extensionStorage.existsDataSink(elementId)) {
- return extensionStorage.getDataSinkById(elementId);
- } else if (extensionStorage.existsDataProcessor(elementId)) {
- return extensionStorage.getDataProcessorById(elementId);
- } else if (extensionStorage.existsDataStream(elementId)) {
- return extensionStorage.getDataStreamById(elementId);
- } else if (extensionStorage.existsAdapterDescription(elementId)) {
- return extensionStorage.getAdapterById(elementId);
- } else {
- throw new IllegalArgumentException("Could not find element for ID " + elementId);
- }
- }
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsServiceEndpointResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsServiceEndpointResource.java
index d1f3c6941f..2322712466 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsServiceEndpointResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsServiceEndpointResource.java
@@ -18,21 +18,17 @@
package org.apache.streampipes.rest.impl.admin;
-import org.apache.streampipes.manager.endpoint.EndpointFetcher;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.manager.assets.AssetManager;
+import org.apache.streampipes.manager.endpoint.AvailableExtensionsProvider;
+import org.apache.streampipes.manager.endpoint.ExtensionsResourceUrlProvider;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.security.AuthConstants;
import org.apache.streampipes.rest.shared.exception.SpMessageException;
-import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.http.client.fluent.Request;
+
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@@ -44,144 +40,37 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
@RestController
-@RequestMapping("/api/v2/rdfendpoints")
+@RequestMapping("/api/v2/extension-items")
@PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
public class ExtensionsServiceEndpointResource extends AbstractAuthGuardedRestResource {
- @GetMapping(path = "/items", produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<List<ExtensionsServiceEndpointItem>> getEndpointContents() {
- List<ExtensionsServiceEndpoint> endpoints = getEndpoints();
-
- var installedExtensions = getAllInstalledExtensions();
- List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(endpoints);
- items.forEach(item -> item.setInstalled(isInstalled(installedExtensions, item.getAppId())));
-
- // also add installed elements that are currently not running or available
- items.addAll(getAllAdapterEndpoints(items));
- items.addAll(getAllDataStreamEndpoints(items));
- items.addAll(getAllDataProcessorEndpoints(items));
- items.addAll(getAllDataSinkEndpoints(items));
-
- return ok(items);
+ @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<List<ExtensionItemDescription>> getExtensionItems() {
+ var allExtensions = new AvailableExtensionsProvider(getNoSqlStorage()).getExtensions();
+ return ok(allExtensions);
}
- @PostMapping(path = "/items/icon", produces = "image/png")
- public ResponseEntity<byte[]> getEndpointItemIcon(@RequestBody ExtensionsServiceEndpointItem endpointItem) {
+ @PostMapping(path = "/icon", produces = "image/png")
+ public ResponseEntity<byte[]> getExtensionItemIcon(@RequestBody ExtensionItemDescription endpointItem) {
try {
- byte[] imageBytes = Request.Get(makeIconUrl(endpointItem)).execute().returnContent().asBytes();
+ byte[] imageBytes = getIconImage(endpointItem);
return ok(imageBytes);
} catch (IOException e) {
throw new SpMessageException(HttpStatus.BAD_REQUEST, e);
}
}
- private String makeIconUrl(ExtensionsServiceEndpointItem endpointItem) {
- return endpointItem.getUri() + "/assets/icon";
- }
-
- private List<ExtensionsServiceEndpoint> getEndpoints() {
- return new EndpointFetcher().getEndpoints();
- }
-
- private boolean isInstalled(List<NamedStreamPipesEntity> installedElements,
- String appId) {
- return installedElements
- .stream()
- .filter(e -> !(e instanceof SpDataStream)) // DataStreams are not getting installed
- .anyMatch(e -> e.getAppId().equals(appId));
- }
-
- private List<NamedStreamPipesEntity> getAllInstalledExtensions() {
- List<NamedStreamPipesEntity> elements = new ArrayList<>();
- elements.addAll(getAllAdapters());
- elements.addAll(getAllDataStreams());
- elements.addAll(getAllDataProcessors());
- elements.addAll(getAllDataSinks());
- return elements;
- }
-
- private List<ExtensionsServiceEndpointItem> getAllAdapterEndpoints(
- List<ExtensionsServiceEndpointItem> existingItems) {
- return getAllAdapters()
- .stream()
- .filter(s -> existingItems.stream().noneMatch(item -> s.getAppId().equals(item.getAppId())))
- .map(adapter -> makeItem(adapter, "adapter"))
- .toList();
- }
-
- private List<ExtensionsServiceEndpointItem> getAllDataStreamEndpoints(
- List<ExtensionsServiceEndpointItem> existingItems) {
- return getAllDataStreams()
- .stream()
- // compared to similar methods we use the elementId here instead of the appId
- // because data streams are supposed to do not have an appId
- .filter(s -> existingItems.stream().noneMatch(item -> s.getElementId().equals(item.getElementId())))
- .filter(s -> !s.isInternallyManaged())
- .map(stream -> makeItem(stream, "stream"))
- .toList();
- }
-
-
- private List<ExtensionsServiceEndpointItem> getAllDataProcessorEndpoints(
- List<ExtensionsServiceEndpointItem> existingItems) {
-
- return getAllDataProcessors()
- .stream()
- .filter(s -> existingItems.stream().noneMatch(item -> s.getAppId().equals(item.getAppId())))
- .map(source -> makeItem(source, "sepa"))
- .toList();
- }
-
- private List<ExtensionsServiceEndpointItem> getAllDataSinkEndpoints(
- List<ExtensionsServiceEndpointItem> existingItems) {
-
- return getAllDataSinks()
- .stream()
- .filter(s -> existingItems.stream().noneMatch(item -> s.getAppId().equals(item.getAppId())))
- .map(source -> makeItem(source, "action"))
- .toList();
- }
-
- private ExtensionsServiceEndpointItem makeItem(NamedStreamPipesEntity entity, String type) {
- ExtensionsServiceEndpointItem endpoint = new ExtensionsServiceEndpointItem();
- endpoint.setInstalled(true);
- endpoint.setDescription(entity.getDescription());
- endpoint.setName(entity.getName());
- endpoint.setType(type);
- endpoint.setAvailable(false);
- endpoint.setElementId(entity.getElementId());
- endpoint.setUri(entity.getElementId());
- endpoint.setEditable(!(entity.isInternallyManaged()));
- endpoint.setIncludesIcon(entity.isIncludesAssets() && entity.getIncludedAssets().contains(Assets.ICON));
- endpoint.setIncludesDocs(entity.isIncludesAssets() && entity.getIncludedAssets().contains(Assets.DOCUMENTATION));
-
- if (!(entity instanceof SpDataStream)) {
- endpoint.setAppId(entity.getAppId());
+ private byte[] getIconImage(ExtensionItemDescription extensionItemDescription) throws IOException {
+ if (extensionItemDescription.isInstalled()) {
+ return AssetManager.getAssetIcon(extensionItemDescription.getAppId());
+ } else {
+ var iconUrl = new ExtensionsResourceUrlProvider(
+ SpServiceDiscovery.getServiceDiscovery()
+ ).getIconUrl(extensionItemDescription);
+ return Request.Get(iconUrl).execute().returnContent().asBytes();
}
-
- return endpoint;
- }
-
- private List<AdapterDescription> getAllAdapters() {
- return getNoSqlStorage().getAdapterDescriptionStorage().getAllAdapters();
- }
-
- private List<SpDataStream> getAllDataStreams() {
- return getNoSqlStorage().getDataStreamStorage().getAll()
- .stream()
- .filter(stream -> !stream.isInternallyManaged())
- .toList();
- }
-
- private List<DataProcessorDescription> getAllDataProcessors() {
- return getNoSqlStorage().getDataProcessorStorage().getAll();
- }
-
- private List<DataSinkDescription> getAllDataSinks() {
- return getNoSqlStorage().getDataSinkStorage().getAll();
}
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
index f9205ff835..350857bb2a 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/utils/Assets.java
@@ -17,8 +17,14 @@
*/
package org.apache.streampipes.sdk.utils;
+import org.apache.streampipes.model.extensions.ExtensionAsset;
+
+/**
+ * @deprecated Use {@link ExtensionAsset instead}
+ */
+@Deprecated(forRemoval = true, since = "0.95.0")
public class Assets {
- public static final String DOCUMENTATION = "documentation.md";
- public static final String ICON = "icon.png";
+ public static final String DOCUMENTATION = ExtensionAsset.DOCUMENTATION;
+ public static final String ICON = ExtensionAsset.ICON;
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
index 573563a325..0631715745 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
@@ -23,6 +23,5 @@ public class SpServicePathPrefix {
public static final String DATA_SINK = "sec";
public static final String DATA_STREAM = "stream";
public static final String ADAPTER = "api/v1/worker/adapters";
- public static final String PROTOCOL = "api/v1/worker/protocols";
}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
index 892086d073..71b0b57548 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.svcdiscovery.api.model;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import java.util.StringJoiner;
+
public enum SpServiceUrlProvider {
DATA_PROCESSOR(SpServicePathPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_PROCESSOR),
@@ -43,18 +45,6 @@ public enum SpServiceUrlProvider {
return this.prefix;
}
- public String getInvocationUrl(String host,
- Integer port,
- String appId) {
- return http
- + host
- + ":"
- + port
- + slash
- + this.prefix
- + slash + appId;
- }
-
public String getInvocationUrl(String baseUrl, String appId) {
return baseUrl
+ slash
@@ -63,23 +53,14 @@ public enum SpServiceUrlProvider {
+ appId;
}
- public String getDetachUrl(String host,
- Integer port,
- String appId,
- String invocationId) {
- return getInvocationUrl(host, port, appId)
- + slash
- + invocationId;
- }
-
- public String getDetachUrl(String baseUrl, String appId, String invocationId) {
- return getInvocationUrl(baseUrl, appId)
- + slash
- + invocationId;
- }
-
- public SpServiceTagPrefix getServiceTagPrefix() {
- return serviceTagPrefix;
+ public String getIconUrl(String baseUrl, String appId) {
+ return new StringJoiner(slash)
+ .add(baseUrl)
+ .add(this.prefix)
+ .add(appId)
+ .add("assets")
+ .add("icon")
+ .toString();
}
public SpServiceTag getServiceTag(String appId) {
diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java
index 9d6851289f..55ee98bd1b 100644
--- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java
+++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
import org.apache.streampipes.storage.api.CRUDStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
import org.slf4j.Logger;
@@ -49,8 +48,9 @@ public class SpServiceDiscoveryCore implements ISpServiceDiscovery {
@Override
public List<String> getActivePipelineElementEndpoints() {
LOG.info("Discovering active pipeline element service endpoints");
- return getServiceEndpoints(DefaultSpServiceTypes.EXT, true,
- Collections.singletonList(DefaultSpServiceTags.PE.asString()));
+ return getServiceEndpoints(DefaultSpServiceTypes.EXT,
+ true,
+ List.of());
}
@Override
diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java
index ff7c721524..29b60bd186 100644
--- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java
@@ -17,55 +17,9 @@
*/
package org.apache.streampipes.service.extensions;
-import org.apache.streampipes.client.StreamPipesClient;
-import org.apache.streampipes.extensions.api.migration.IModelMigrator;
-import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
-import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
-import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
-import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
-import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
-import org.apache.streampipes.rest.extensions.WelcomePage;
-import org.apache.streampipes.service.base.rest.ServiceHealthResource;
-import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
-import org.apache.streampipes.service.extensions.security.WebSecurityConfig;
-
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-
-import jakarta.annotation.PreDestroy;
-
-import java.util.List;
-
-@Configuration
-@EnableAutoConfiguration
-@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class})
-@ComponentScan({"org.apache.streampipes.rest.extensions.*", "org.apache.streampipes.service.base.rest.*"})
+/**
+ * @deprecated Use {@link StreamPipesExtensionsServiceBase} instead
+ */
+@Deprecated(forRemoval = true, since = "0.95.0")
public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase {
-
- @PreDestroy
- public void onExit() {
- new ExtensionsServiceShutdownHandler().onShutdown();
- StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
- deregisterService(DeclarersSingleton.getInstance().getServiceId());
- }
-
- @Override
- public void afterServiceRegistered(SpServiceDefinition serviceDef,
- SpServiceRegistration serviceReg) {
- StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance();
-
- // register all migrations at StreamPipes Core
- var migrationConfigs = serviceDef.getMigrators().stream().map(IModelMigrator::config).toList();
- new CoreRequestSubmitter().submitMigrationRequest(client, migrationConfigs, serviceId(), serviceReg);
-
- // initialize all function instances
- StreamPipesFunctionHandler.INSTANCE.initializeFunctions(serviceDef.getServiceGroup());
- }
-
- @Override
- protected List<SpServiceTag> getExtensionsServiceTags() {
- return new ServiceTagProvider().extractServiceTags();
- }
}
diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ServiceTagProvider.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ServiceTagProvider.java
deleted file mode 100644
index f565ec46e7..0000000000
--- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ServiceTagProvider.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.service.extensions;
-
-import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
-import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
-import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
-import org.apache.streampipes.extensions.management.util.ServiceDefinitionUtil;
-import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-public class ServiceTagProvider {
-
- public List<SpServiceTag> extractServiceTags() {
- var tags = new ArrayList<SpServiceTag>();
- tags.addAll(extractPipelineElementServiceTags());
- tags.addAll(extractAdapterServiceTags());
-
- return tags;
- }
-
- private List<SpServiceTag> extractPipelineElementServiceTags() {
- Collection<IStreamPipesPipelineElement<?>> declarers =
- DeclarersSingleton.getInstance().getDeclarers().values();
- List<SpServiceTag> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
- serviceTags.add(DefaultSpServiceTags.PE);
-
- return serviceTags;
- }
-
- private List<SpServiceTag> extractAdapterServiceTags() {
- Collection<StreamPipesAdapter> adapters = DeclarersSingleton.getInstance().getAdapters();
- var tags = new ArrayList<>(ServiceDefinitionUtil.extractAppIdsFromAdapters(adapters));
- tags.add(DefaultSpServiceTags.CONNECT_WORKER);
-
- return tags;
- }
-}
diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index 11c351c8f4..b754416094 100644
--- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -19,16 +19,23 @@
package org.apache.streampipes.service.extensions;
import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
+import org.apache.streampipes.extensions.management.locales.LabelGenerator;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import org.apache.streampipes.model.extensions.ExtensionItemDescription;
import org.apache.streampipes.model.extensions.configuration.ConfigItem;
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.rest.extensions.WelcomePage;
import org.apache.streampipes.service.base.BaseNetworkingConfig;
import org.apache.streampipes.service.base.StreamPipesServiceBase;
+import org.apache.streampipes.service.base.rest.ServiceHealthResource;
+import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
+import org.apache.streampipes.service.extensions.security.WebSecurityConfig;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
import org.slf4j.Logger;
@@ -36,10 +43,23 @@ import org.slf4j.LoggerFactory;
import jakarta.annotation.PreDestroy;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import java.io.IOException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Configuration
+@EnableAutoConfiguration
+@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class})
+@ComponentScan({"org.apache.streampipes.rest.extensions.*", "org.apache.streampipes.service.base.rest.*"})
public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServiceBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
@@ -64,22 +84,31 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
}
}
- public abstract SpServiceDefinition provideServiceDefinition();
+ public void afterServiceRegistered(SpServiceDefinition serviceDef,
+ SpServiceRegistration serviceReg) {
+ StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance();
- public abstract void afterServiceRegistered(SpServiceDefinition serviceDef,
- SpServiceRegistration serviceReg);
+ // register all migrations at StreamPipes Core
+ var migrationConfigs = serviceDef.getMigrators().stream().map(IModelMigrator::config).toList();
+ new CoreRequestSubmitter().submitMigrationRequest(client, migrationConfigs, serviceId(), serviceReg);
+
+ // initialize all function instances
+ StreamPipesFunctionHandler.INSTANCE.initializeFunctions(serviceDef.getServiceGroup());
+ }
public void startExtensionsService(Class<?> serviceClass,
SpServiceDefinition serviceDef,
BaseNetworkingConfig networkingConfig) throws UnknownHostException {
+ var extensions = getAllExtensions();
var req = SpServiceRegistration.from(
DefaultSpServiceTypes.EXT,
serviceDef.getServiceGroup(),
serviceId(),
networkingConfig.getHost(),
networkingConfig.getPort(),
- getServiceTags(),
- getHealthCheckPath());
+ getServiceTags(extensions),
+ getHealthCheckPath(),
+ extensions);
LOG.info("Registering service {} with id {} at core", req.getSvcGroup(), req.getSvcId());
registerService(req);
@@ -98,22 +127,50 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
new CoreRequestSubmitter().submitRegistrationRequest(client, serviceRegistration);
}
- protected List<SpServiceTag> getServiceTags() {
- List<SpServiceTag> tags = new ArrayList<>();
+ protected Set<SpServiceTag> getServiceTags(Set<ExtensionItemDescription> extensions) {
+ Set<SpServiceTag> tags = new HashSet<>();
if (DeclarersSingleton.getInstance().getServiceDefinition() != null) {
tags.add(SpServiceTag.create(SpServiceTagPrefix.SP_GROUP,
DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup()));
}
- tags.addAll(getExtensionsServiceTags());
+ tags.addAll(getExtensionsServiceTags(extensions));
return tags;
}
+ private Set<ExtensionItemDescription> getAllExtensions() {
+ return Stream.concat(
+ DeclarersSingleton.getInstance().getDeclarers().values().stream()
+ .map(declarer -> declarer.declareConfig().getDescription()),
+ DeclarersSingleton.getInstance().getAdapters().stream().map(a -> a.declareConfig().getAdapterDescription())
+ )
+ .peek(entity -> {
+ try {
+ if (entity.isIncludesLocales()) {
+ var labelGenerator = new LabelGenerator<>(entity);
+ entity.setName(labelGenerator.getElementTitle());
+ entity.setDescription(labelGenerator.getElementDescription());
+ }
+ } catch (IOException e) {
+
+ }
+ })
+ .map(entity -> entity.toExtensionDescription(false, true, true))
+ .collect(Collectors.toSet());
+ }
+
protected void deregisterService(String serviceId) {
LOG.info("Deregistering service (id={})...", serviceId);
StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance();
client.adminApi().deregisterService(serviceId);
}
+ protected Set<SpServiceTag> getExtensionsServiceTags(Set<ExtensionItemDescription> extensions) {
+ return extensions
+ .stream()
+ .map(e -> SpServiceTag.create(e.getServiceTagPrefix(), e.getAppId()))
+ .collect(Collectors.toSet());
+ }
+
private void registerConfigs(String serviceGroup,
String serviceName,
List<ConfigItem> configs) {
@@ -123,13 +180,17 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
client.adminApi().registerServiceConfiguration(serviceConfiguration);
}
- protected abstract List<SpServiceTag> getExtensionsServiceTags();
-
@PreDestroy
- public abstract void onExit();
+ public void onExit() {
+ new ExtensionsServiceShutdownHandler().onShutdown();
+ StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
+ deregisterService(DeclarersSingleton.getInstance().getServiceId());
+ }
public String serviceId() {
return DeclarersSingleton.getInstance().getServiceId();
}
+ public abstract SpServiceDefinition provideServiceDefinition();
+
}
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
deleted file mode 100644
index 75d3b376d3..0000000000
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.storage.api;
-
-import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
-
-import java.util.List;
-
-public interface IExtensionsServiceEndpointStorage {
-
- void addExtensionsServiceEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint);
-
- void removeExtensionsServiceEndpoint(String extensionServiceEndpointId);
-
- List<ExtensionsServiceEndpoint> getExtensionsServiceEndpoints();
-}
diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-element-endpoint.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/extension-installation.service.ts
similarity index 65%
rename from ui/projects/streampipes/platform-services/src/lib/apis/pipeline-element-endpoint.service.ts
rename to ui/projects/streampipes/platform-services/src/lib/apis/extension-installation.service.ts
index 9c13171267..8381a79ab9 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/pipeline-element-endpoint.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/extension-installation.service.ts
@@ -2,6 +2,7 @@ import { Injectable } from '@angular/core';
import { HttpClient, HttpParams } from '@angular/common/http';
import { PlatformServicesCommons } from './commons.service';
import { Observable } from 'rxjs';
+import { ExtensionItemInstallationRequest } from '../model/gen/streampipes-model';
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -24,36 +25,33 @@ import { Observable } from 'rxjs';
@Injectable({
providedIn: 'root',
})
-export class PipelineElementEndpointService {
+export class ExtensionInstallationService {
constructor(
private http: HttpClient,
private platformServicesCommons: PlatformServicesCommons,
) {}
- add(elementUri, ispublic): Observable<any> {
- const payload = new HttpParams()
- .set('uri', elementUri)
- .set('publicElement', ispublic);
+ add(
+ installationRequest: ExtensionItemInstallationRequest,
+ ): Observable<any> {
return this.http.post(
- this.platformServicesCommons.apiBasePath + '/element',
- payload,
+ `${this.platformServicesCommons.apiBasePath}/extension-installation`,
+ installationRequest,
);
}
- update(elementUri): Observable<any> {
+ update(
+ installationRequest: ExtensionItemInstallationRequest,
+ ): Observable<any> {
return this.http.put(
- this.platformServicesCommons.apiBasePath +
- '/element/' +
- encodeURIComponent(elementUri),
- undefined,
+ `${this.platformServicesCommons.apiBasePath}/extension-installation`,
+ installationRequest,
);
}
- del(elementUri): Observable<any> {
+ delete(elementId: string): Observable<any> {
return this.http.delete(
- this.platformServicesCommons.apiBasePath +
- '/element/' +
- encodeURIComponent(elementUri),
+ `${this.platformServicesCommons.apiBasePath}/extension-installation/${elementId}`,
);
}
}
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 6523bfe64a..03d64442c9 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2023-12-04 13:14:24.
+// Generated using typescript-generator version 3.2.1263 on 2024-03-27 09:18:38.
export class NamedStreamPipesEntity {
'@class':
@@ -1790,6 +1791,60 @@ export class ExportItem {
}
}
+export class ExtensionItemDescription {
+ appId: string;
+ available: boolean;
+ description: string;
+ editable: boolean;
+ elementId: string;
+ includesDocs: boolean;
+ includesIcon: boolean;
+ installed: boolean;
+ name: string;
+ serviceTagPrefix: SpServiceTagPrefix;
+
+ static fromData(
+ data: ExtensionItemDescription,
+ target?: ExtensionItemDescription,
+ ): ExtensionItemDescription {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new ExtensionItemDescription();
+ instance.appId = data.appId;
+ instance.available = data.available;
+ instance.description = data.description;
+ instance.editable = data.editable;
+ instance.elementId = data.elementId;
+ instance.includesDocs = data.includesDocs;
+ instance.includesIcon = data.includesIcon;
+ instance.installed = data.installed;
+ instance.name = data.name;
+ instance.serviceTagPrefix = data.serviceTagPrefix;
+ return instance;
+ }
+}
+
+export class ExtensionItemInstallationRequest {
+ appId: string;
+ publicElement: boolean;
+ serviceTagPrefix: SpServiceTagPrefix;
+
+ static fromData(
+ data: ExtensionItemInstallationRequest,
+ target?: ExtensionItemInstallationRequest,
+ ): ExtensionItemInstallationRequest {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new ExtensionItemInstallationRequest();
+ instance.appId = data.appId;
+ instance.publicElement = data.publicElement;
+ instance.serviceTagPrefix = data.serviceTagPrefix;
+ return instance;
+ }
+}
+
export class FieldStatusInfo {
additionalInfo: string;
changesRequired: boolean;
@@ -1814,8 +1869,8 @@ export class FileMetadata {
createdAt: number;
createdByUser: string;
fileId: string;
- filetype: string;
filename: string;
+ filetype: string;
lastModified: number;
rev: string;
@@ -1827,8 +1882,8 @@ export class FileMetadata {
instance.createdAt = data.createdAt;
instance.createdByUser = data.createdByUser;
instance.fileId = data.fileId;
- instance.filetype = data.filetype;
instance.filename = data.filename;
+ instance.filetype = data.filetype;
instance.lastModified = data.lastModified;
instance.rev = data.rev;
return instance;
@@ -3545,6 +3600,7 @@ export class SpQueryResult {
allDataSeries: DataSeries[];
forId: string;
headers: string[];
+ lastTimestamp: number;
sourceIndex: number;
spQueryStatus: SpQueryStatus;
total: number;
@@ -3562,6 +3618,7 @@ export class SpQueryResult {
);
instance.forId = data.forId;
instance.headers = __getCopyArrayFn(__identity<string>())(data.headers);
+ instance.lastTimestamp = data.lastTimestamp;
instance.sourceIndex = data.sourceIndex;
instance.spQueryStatus = data.spQueryStatus;
instance.total = data.total;
@@ -3596,6 +3653,7 @@ export class SpServiceRegistration {
healthCheckPath: string;
host: string;
port: number;
+ providedExtensions: ExtensionItemDescription[];
rev: string;
scheme: string;
serviceUrl: string;
@@ -3617,6 +3675,9 @@ export class SpServiceRegistration {
instance.healthCheckPath = data.healthCheckPath;
instance.host = data.host;
instance.port = data.port;
+ instance.providedExtensions = __getCopyArrayFn(
+ ExtensionItemDescription.fromData,
+ )(data.providedExtensions);
instance.rev = data.rev;
instance.scheme = data.scheme;
instance.serviceUrl = data.serviceUrl;
@@ -4111,7 +4172,6 @@ export type SpServiceStatus =
| 'UNHEALTHY';
export type SpServiceTagPrefix =
- | 'SYSTEM'
| 'SP_GROUP'
| 'ADAPTER'
| 'DATA_STREAM'
diff --git a/ui/projects/streampipes/platform-services/src/public-api.ts b/ui/projects/streampipes/platform-services/src/public-api.ts
index 2e44e5cf9a..669bf4f238 100644
--- a/ui/projects/streampipes/platform-services/src/public-api.ts
+++ b/ui/projects/streampipes/platform-services/src/public-api.ts
@@ -40,7 +40,7 @@ export * from './lib/apis/permissions.service';
export * from './lib/apis/pipeline.service';
export * from './lib/apis/pipeline-canvas-metadata.service';
export * from './lib/apis/pipeline-element.service';
-export * from './lib/apis/pipeline-element-endpoint.service';
+export * from './lib/apis/extension-installation.service';
export * from './lib/apis/pipeline-element-template.service';
export * from './lib/apis/pipeline-monitoring.service';
export * from './lib/apis/pipeline-template.service';
diff --git a/ui/src/app/add/add.component.html b/ui/src/app/add/add.component.html
index 78d0dcac10..afc4d74495 100644
--- a/ui/src/app/add/add.component.html
+++ b/ui/src/app/add/add.component.html
@@ -74,13 +74,15 @@
<mat-form-field color="accent" appearance="outline">
<mat-select
[(value)]="selectedCategory"
- (selectionChange)="filterByCatergory($event)"
+ (selectionChange)="filterByCategory($event)"
>
- <mat-option [value]="'all'"> All </mat-option>
- <mat-option [value]="'adapter'"> Adapters </mat-option>
- <mat-option [value]="'stream'"> Streams </mat-option>
- <mat-option [value]="'sepa'"> Processors </mat-option>
- <mat-option [value]="'action'"> Sinks </mat-option>
+ <mat-option value="all"> All </mat-option>
+ <mat-option value="ADAPTER"> Adapters </mat-option>
+ <mat-option value="DATA_STREAM"> Streams </mat-option>
+ <mat-option value="DATA_PROCESSOR">
+ Processors
+ </mat-option>
+ <mat-option value="DATA_SINK"> Sinks </mat-option>
</mat-select>
</mat-form-field>
</div>
diff --git a/ui/src/app/add/add.component.ts b/ui/src/app/add/add.component.ts
index f5bcffad7a..d63ce6c56e 100644
--- a/ui/src/app/add/add.component.ts
+++ b/ui/src/app/add/add.component.ts
@@ -25,9 +25,9 @@ import {
SpBreadcrumbService,
} from '@streampipes/shared-ui';
import { EndpointInstallationComponent } from './dialogs/endpoint-installation/endpoint-installation.component';
-import { ExtensionsServiceEndpointItem } from '@streampipes/platform-services';
-import { Router } from '@angular/router';
import { SpAddRoutes } from './add.routes';
+import { ExtensionItemDescription } from '@streampipes/platform-services';
+import { MatSelectChange } from '@angular/material/select';
@Component({
selector: 'sp-add',
@@ -39,14 +39,12 @@ export class AddComponent implements OnInit {
results: any[];
loading: boolean;
- endpointItems: ExtensionsServiceEndpointItem[];
+ endpointItems: ExtensionItemDescription[];
endpointItemsLoadingComplete: boolean;
selectedTab: string;
- availableTypes: string[] = ['all', 'set', 'stream', 'sepa', 'action'];
-
selectedCategory = 'all';
- selectedEndpointItems: any[] = [];
+ selectedEndpointItems: ExtensionItemDescription[] = [];
_filterTerm = '';
_selectedInstallationStatus = 'all';
@@ -55,7 +53,6 @@ export class AddComponent implements OnInit {
private addService: AddService,
private dialogService: DialogService,
private changeDetectorRef: ChangeDetectorRef,
- private router: Router,
private breadcrumbService: SpBreadcrumbService,
) {
this.results = [];
@@ -72,43 +69,41 @@ export class AddComponent implements OnInit {
this.selectedTab = 'all';
}
- toggleSelected(endpointItem) {
+ toggleSelected(endpointItem: ExtensionItemDescription) {
if (endpointItem.editable) {
if (
- this.selectedEndpointItems.some(
- item => item === endpointItem.uri,
- )
+ this.selectedEndpointItems.some(item => item === endpointItem)
) {
this.selectedEndpointItems.splice(
- this.selectedEndpointItems.indexOf(endpointItem.uri),
+ this.selectedEndpointItems.indexOf(endpointItem),
1,
);
} else {
- this.selectedEndpointItems.push(endpointItem.uri);
+ this.selectedEndpointItems.push(endpointItem);
}
- endpointItem.selected = !endpointItem.selected;
+ (endpointItem as any).selected = !(endpointItem as any).selected;
}
}
- isSelected(endpointItem) {
- return endpointItem.selected;
+ isSelected(endpointItem: ExtensionItemDescription) {
+ return (endpointItem as any).selected;
}
- filterByCatergory(category) {
+ filterByCategory(category: MatSelectChange) {
this.selectedTab = category.value;
}
- selectAll(selected) {
+ selectAll(selected: boolean) {
this.selectedEndpointItems = [];
this.endpointItems.forEach(item => {
if (item.editable) {
if (
- item.type === this.selectedTab ||
+ item.serviceTagPrefix === this.selectedTab ||
this.selectedTab === 'all'
) {
(item as any).selected = selected;
if (selected) {
- this.selectedEndpointItems.push(item.uri);
+ this.selectedEndpointItems.push(item);
}
}
}
@@ -136,7 +131,10 @@ export class AddComponent implements OnInit {
const elementsToInstall = [];
this.endpointItems.forEach(item => {
- if (item.type === this.selectedTab || this.selectedTab === 'all') {
+ if (
+ item.serviceTagPrefix === this.selectedTab ||
+ this.selectedTab === 'all'
+ ) {
if (item.installed === !install && (item as any).selected) {
elementsToInstall.push(item);
}
diff --git a/ui/src/app/add/add.module.ts b/ui/src/app/add/add.module.ts
index 65e79d61d0..0347e2c948 100644
--- a/ui/src/app/add/add.module.ts
+++ b/ui/src/app/add/add.module.ts
@@ -38,6 +38,11 @@ import { MatOptionModule } from '@angular/material/core';
import { MatTooltipModule } from '@angular/material/tooltip';
import { MatIconModule } from '@angular/material/icon';
import { MatSelectModule } from '@angular/material/select';
+import { MatMenuModule } from '@angular/material/menu';
+import { MatFormFieldModule } from '@angular/material/form-field';
+import { MatInputModule } from '@angular/material/input';
+import { MatButtonModule } from '@angular/material/button';
+import { MatCheckboxModule } from '@angular/material/checkbox';
@NgModule({
imports: [
@@ -45,7 +50,12 @@ import { MatSelectModule } from '@angular/material/select';
CoreUiModule,
FormsModule,
FlexLayoutModule,
+ MatButtonModule,
+ MatCheckboxModule,
MatDividerModule,
+ MatFormFieldModule,
+ MatInputModule,
+ MatMenuModule,
MatOptionModule,
MatIconModule,
MatSelectModule,
diff --git a/ui/src/app/add/components/endpoint-item/endpoint-item.component.html b/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
index 1a536414f1..4b3393255e 100644
--- a/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
+++ b/ui/src/app/add/components/endpoint-item/endpoint-item.component.html
@@ -49,13 +49,13 @@
<div style="margin-right: 10px; margin-left: 10px; margin-top: 10px">
<div
*ngIf="!item.includesIcon || iconError"
- class="draggable-icon {{ item.type }}"
+ class="draggable-icon {{ cssMapper[item.serviceTagPrefix] }}"
>
{{ iconText(item.name) }}
</div>
<div
*ngIf="item.includesIcon && iconReady"
- class="draggable-icon {{ item.type }}"
+ class="draggable-icon {{ cssMapper[item.serviceTagPrefix] }}"
>
<img class="icon" [src]="image" />
</div>
@@ -101,7 +101,7 @@
<mat-menu #menu="matMenu">
<button
mat-menu-item
- (click)="refresh(item.elementId)"
+ (click)="refresh(item)"
[disabled]="!item.available"
>
<mat-icon>refresh</mat-icon>
diff --git a/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts b/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
index 73c8f331a1..45190524e6 100644
--- a/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
+++ b/ui/src/app/add/components/endpoint-item/endpoint-item.component.ts
@@ -21,8 +21,9 @@ import { MatSnackBar } from '@angular/material/snack-bar';
import { AddService } from '../../services/add.service';
import { DomSanitizer, SafeUrl } from '@angular/platform-browser';
import {
- ExtensionsServiceEndpointItem,
- PipelineElementEndpointService,
+ ExtensionItemInstallationRequest,
+ ExtensionInstallationService,
+ ExtensionItemDescription,
} from '@streampipes/platform-services';
import { AppConstants } from '../../../services/app.constants';
import { ObjectPermissionDialogComponent } from '../../../core-ui/object-permission-dialog/object-permission-dialog.component';
@@ -35,7 +36,7 @@ import { PanelType, DialogService } from '@streampipes/shared-ui';
})
export class EndpointItemComponent implements OnInit {
@Input()
- item: ExtensionsServiceEndpointItem;
+ item: ExtensionItemDescription;
itemTypeTitle: string;
itemTypeStyle: string;
@@ -47,12 +48,19 @@ export class EndpointItemComponent implements OnInit {
iconReady = false;
iconError = false;
+ cssMapper: Record<string, string> = {
+ ADAPTER: 'adapter',
+ DATA_STREAM: 'stream',
+ DATA_PROCESSOR: 'sepa',
+ DATA_SINK: 'action',
+ };
+
@Output()
triggerInstallation: EventEmitter<any> = new EventEmitter<any>();
constructor(
private snackBar: MatSnackBar,
- private pipelineElementEndpointService: PipelineElementEndpointService,
+ private extensionInstallationService: ExtensionInstallationService,
private addService: AddService,
private sanitizer: DomSanitizer,
public appConstants: AppConstants,
@@ -99,11 +107,11 @@ export class EndpointItemComponent implements OnInit {
}
findItemTypeTitle() {
- if (this.item.type === 'adapter') {
+ if (this.item.serviceTagPrefix === 'ADAPTER') {
this.itemTypeTitle = 'Adapter';
- } else if (this.item.type === 'stream') {
+ } else if (this.item.serviceTagPrefix === 'DATA_STREAM') {
this.itemTypeTitle = 'Data Stream';
- } else if (this.item.type === 'sepa') {
+ } else if (this.item.serviceTagPrefix === 'DATA_PROCESSOR') {
this.itemTypeTitle = 'Data Processor';
} else {
this.itemTypeTitle = 'Data Sink';
@@ -112,62 +120,59 @@ export class EndpointItemComponent implements OnInit {
findItemStyle() {
const baseType = 'pe-label ';
- if (this.item.type === 'stream') {
+ if (this.item.serviceTagPrefix === 'DATA_STREAM') {
this.itemTypeStyle = baseType + 'stream-label';
- } else if (this.item.type === 'adapter') {
+ } else if (this.item.serviceTagPrefix === 'ADAPTER') {
this.itemTypeStyle = baseType + 'adapter-label';
- } else if (this.item.type === 'sepa') {
+ } else if (this.item.serviceTagPrefix === 'DATA_PROCESSOR') {
this.itemTypeStyle = baseType + 'processor-label';
} else {
this.itemTypeStyle = baseType + 'sink-label';
}
}
- installSingleElement(event: Event, endpointItem) {
+ installSingleElement(event: Event, endpointItem: ExtensionItemDescription) {
const endpointItems = [];
endpointItems.push(endpointItem);
this.triggerInstallation.emit({ endpointItems, install: true });
event.stopPropagation();
}
- uninstallSingleElement(event: Event, endpointItem) {
+ uninstallSingleElement(
+ event: Event,
+ endpointItem: ExtensionItemDescription,
+ ) {
const endpointItems = [];
endpointItems.push(endpointItem);
this.triggerInstallation.emit({ endpointItems, install: false });
event.stopPropagation();
}
- refresh(elementId: string) {
- this.pipelineElementEndpointService
- .update(elementId)
+ refresh(extensionItem: ExtensionItemDescription) {
+ const installationReq: ExtensionItemInstallationRequest = {
+ serviceTagPrefix: extensionItem.serviceTagPrefix,
+ publicElement: false,
+ appId: extensionItem.appId,
+ };
+ this.extensionInstallationService
+ .update(installationReq)
.subscribe(msg => {
this.snackBar.open(msg.notifications[0].title, 'Ok', {
duration: 2000,
});
- })
- .add(() => {
- // this.loadCurrentElements(type);
});
}
showPermissionsDialog(elementId: string, elementName: string) {
- const dialogRef = this.dialogService.open(
- ObjectPermissionDialogComponent,
- {
- panelType: PanelType.SLIDE_IN_PANEL,
- title: 'Manage permissions',
- width: '50vw',
- data: {
- objectInstanceId: elementId,
- headerTitle:
- 'Manage permissions for pipeline element ' +
- elementName,
- },
+ this.dialogService.open(ObjectPermissionDialogComponent, {
+ panelType: PanelType.SLIDE_IN_PANEL,
+ title: 'Manage permissions',
+ width: '50vw',
+ data: {
+ objectInstanceId: elementId,
+ headerTitle:
+ 'Manage permissions for pipeline element ' + elementName,
},
- );
-
- dialogRef.afterClosed().subscribe(refresh => {
- console.log(refresh);
});
}
}
diff --git a/ui/src/app/add/dialogs/endpoint-installation/endpoint-installation.component.ts b/ui/src/app/add/dialogs/endpoint-installation/endpoint-installation.component.ts
index c05b243607..a789f4c816 100644
--- a/ui/src/app/add/dialogs/endpoint-installation/endpoint-installation.component.ts
+++ b/ui/src/app/add/dialogs/endpoint-installation/endpoint-installation.component.ts
@@ -18,7 +18,10 @@
import { Component, Input } from '@angular/core';
import { DialogRef } from '@streampipes/shared-ui';
-import { PipelineElementEndpointService } from '@streampipes/platform-services';
+import {
+ ExtensionInstallationService,
+ ExtensionItemDescription,
+} from '@streampipes/platform-services';
@Component({
selector: 'sp-endpoint-installation-dialog',
@@ -26,7 +29,7 @@ import { PipelineElementEndpointService } from '@streampipes/platform-services';
styleUrls: ['./endpoint-installation.component.scss'],
})
export class EndpointInstallationComponent {
- endpointItems: any;
+ endpointItems: ExtensionItemDescription[];
@Input()
install: boolean;
@@ -46,7 +49,7 @@ export class EndpointInstallationComponent {
constructor(
private dialogRef: DialogRef<EndpointInstallationComponent>,
- private pipelineElementEndpointService: PipelineElementEndpointService,
+ private extensionInstallationService: ExtensionInstallationService,
) {
this.installationStatus = [];
this.installationFinished = false;
@@ -68,25 +71,30 @@ export class EndpointInstallationComponent {
}
}
- initiateInstallation(endpointUri, index) {
+ initiateInstallation(
+ extensionItem: ExtensionItemDescription,
+ index: number,
+ ) {
this.installationRunning = true;
this.installationStatus.push({
- name: endpointUri.name,
+ name: extensionItem.name,
id: index,
status: 'waiting',
});
if (this.install) {
- this.installElement(endpointUri, index);
+ this.installElement(extensionItem, index);
} else {
- this.uninstallElement(endpointUri, index);
+ this.uninstallElement(extensionItem, index);
}
}
- installElement(endpointUri, index) {
- endpointUri = encodeURIComponent(endpointUri.uri);
-
- this.pipelineElementEndpointService
- .add(endpointUri, this.installAsPublicElement)
+ installElement(extensionItem: ExtensionItemDescription, index: number) {
+ this.extensionInstallationService
+ .add({
+ appId: extensionItem.appId,
+ publicElement: this.installAsPublicElement,
+ serviceTagPrefix: extensionItem.serviceTagPrefix,
+ })
.subscribe(
data => {
if (data.success) {
@@ -116,9 +124,9 @@ export class EndpointInstallationComponent {
});
}
- uninstallElement(endpointUri, index) {
- this.pipelineElementEndpointService
- .del(endpointUri.elementId)
+ uninstallElement(extensionItem: ExtensionItemDescription, index: number) {
+ this.extensionInstallationService
+ .delete(extensionItem.elementId)
.subscribe(
data => {
this.installationStatus[index].status = data.success
diff --git a/ui/src/app/add/filter/order-by.pipe.ts b/ui/src/app/add/filter/order-by.pipe.ts
index b73352efb2..f438df3451 100644
--- a/ui/src/app/add/filter/order-by.pipe.ts
+++ b/ui/src/app/add/filter/order-by.pipe.ts
@@ -17,10 +17,15 @@
*/
import { Pipe, PipeTransform } from '@angular/core';
+import { ExtensionItemDescription } from '@streampipes/platform-services';
@Pipe({ name: 'orderBy' })
export class OrderByPipe implements PipeTransform {
- transform(value: any[], order = '', column: string = ''): any[] {
+ transform(
+ value: ExtensionItemDescription[],
+ order = '',
+ column: string = '',
+ ): ExtensionItemDescription[] {
if (!value || order === '' || !order) {
return value;
}
diff --git a/ui/src/app/add/filter/pipeline-element-installation-status.pipe.ts b/ui/src/app/add/filter/pipeline-element-installation-status.pipe.ts
index 3c88f5c563..18679b2acf 100644
--- a/ui/src/app/add/filter/pipeline-element-installation-status.pipe.ts
+++ b/ui/src/app/add/filter/pipeline-element-installation-status.pipe.ts
@@ -17,13 +17,17 @@
*/
import { Pipe, PipeTransform } from '@angular/core';
+import { ExtensionItemDescription } from '@streampipes/platform-services';
@Pipe({
name: 'pipelineElementInstallationStatusFilter',
pure: false,
})
export class PipelineElementInstallationStatusFilter implements PipeTransform {
- transform(values: any[], filterTerm: string): any[] {
+ transform(
+ values: ExtensionItemDescription[],
+ filterTerm: string,
+ ): ExtensionItemDescription[] {
if (filterTerm === 'all') {
return values;
} else {
diff --git a/ui/src/app/add/filter/pipeline-element-name.pipe.ts b/ui/src/app/add/filter/pipeline-element-name.pipe.ts
index 5a650aa20e..ff7970a504 100644
--- a/ui/src/app/add/filter/pipeline-element-name.pipe.ts
+++ b/ui/src/app/add/filter/pipeline-element-name.pipe.ts
@@ -17,13 +17,17 @@
*/
import { Pipe, PipeTransform } from '@angular/core';
+import { ExtensionItemDescription } from '@streampipes/platform-services';
@Pipe({
name: 'pipelineElementNameFilter',
pure: false,
})
export class PipelineElementNameFilter implements PipeTransform {
- transform(values: any[], filterTerm: string): any[] {
+ transform(
+ values: ExtensionItemDescription[],
+ filterTerm: string,
+ ): ExtensionItemDescription[] {
if (filterTerm === '') {
return values;
} else {
diff --git a/ui/src/app/add/filter/pipeline-element-type.pipe.ts b/ui/src/app/add/filter/pipeline-element-type.pipe.ts
index 98ecf9bea7..b08c92e390 100644
--- a/ui/src/app/add/filter/pipeline-element-type.pipe.ts
+++ b/ui/src/app/add/filter/pipeline-element-type.pipe.ts
@@ -17,17 +17,21 @@
*/
import { Pipe, PipeTransform } from '@angular/core';
+import { ExtensionItemDescription } from '@streampipes/platform-services';
@Pipe({
name: 'pipelineElementTypeFilter',
pure: false,
})
export class PipelineElementTypeFilter implements PipeTransform {
- transform(value: any[], type: string): any[] {
- if (type === 'all') {
+ transform(
+ value: ExtensionItemDescription[],
+ serviceTagPrefix: string,
+ ): ExtensionItemDescription[] {
+ if (serviceTagPrefix === 'all') {
return value;
} else {
- return value.filter(v => v.type === type);
+ return value.filter(v => v.serviceTagPrefix === serviceTagPrefix);
}
}
}
diff --git a/ui/src/app/add/services/add.service.ts b/ui/src/app/add/services/add.service.ts
index f49b7c259e..e28cd506c6 100644
--- a/ui/src/app/add/services/add.service.ts
+++ b/ui/src/app/add/services/add.service.ts
@@ -19,8 +19,8 @@
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
+ ExtensionItemDescription,
PlatformServicesCommons,
- ExtensionsServiceEndpointItem,
} from '@streampipes/platform-services';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@@ -32,25 +32,21 @@ export class AddService {
private platformServicesCommons: PlatformServicesCommons,
) {}
- getRdfEndpointItems(): Observable<ExtensionsServiceEndpointItem[]> {
+ getRdfEndpointItems(): Observable<ExtensionItemDescription[]> {
return this.http
- .get(
- this.platformServicesCommons.apiBasePath +
- '/rdfendpoints/items',
- )
+ .get(this.platformServicesCommons.apiBasePath + '/extension-items')
.pipe(
map(response => {
return (response as any[]).map(item =>
- ExtensionsServiceEndpointItem.fromData(item),
+ ExtensionItemDescription.fromData(item),
);
}),
);
}
- getRdfEndpointIcon(item: ExtensionsServiceEndpointItem): Observable<any> {
+ getRdfEndpointIcon(item: ExtensionItemDescription): Observable<any> {
return this.http.post(
- this.platformServicesCommons.apiBasePath +
- '/rdfendpoints/items/icon',
+ this.platformServicesCommons.apiBasePath + '/extension-items/icon',
item,
{ responseType: 'blob' },
);