You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/07/28 09:43:47 UTC
[incubator-streampipes] 04/04: [hotfix] Filter transformation rules for timestamp rules
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit bf076736ffc6bf34e9e114ade234052463f394a2
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jul 28 11:43:32 2022 +0200
[hotfix] Filter transformation rules for timestamp rules
---
.../container/worker/management/AdapterWorkerManagement.java | 6 ++----
.../connect/container/worker/rest/AdapterWorkerResource.java | 12 ++++++++----
.../model/connect/adapter/AdapterDescription.java | 4 +++-
3 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 1c8d6065c..f250ac7b3 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -80,11 +80,9 @@ public class AdapterWorkerManagement {
IAdapter<?> adapter = RunningAdapterInstances.INSTANCE.removeAdapter(elementId);
- if (adapter == null) {
- throw new AdapterException("Adapter with id " + elementId + " was not found in this container and cannot be stopped.");
+ if (adapter != null) {
+ adapter.stopAdapter();
}
-
- adapter.stopAdapter();
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index e4235906f..6fd89be1b 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -82,15 +82,19 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
@Produces(MediaType.APPLICATION_JSON)
public Response stopStreamAdapter(AdapterStreamDescription adapterStreamDescription) {
+ String responseMessage;
try {
- adapterManagement.stopStreamAdapter(adapterStreamDescription);
+ if (adapterStreamDescription.isRunning()) {
+ adapterManagement.stopStreamAdapter(adapterStreamDescription);
+ responseMessage = "Stream adapter with id " + adapterStreamDescription.getElementId() + " successfully stopped";
+ } else {
+ responseMessage = "Stream adapter with id " + adapterStreamDescription.getElementId() + " seems not to be running";
+ }
} catch (AdapterException e) {
- logger.error("Error while stopping adapter with id " + adapterStreamDescription.getUri(), e);
+ logger.error("Error while stopping adapter with id " + adapterStreamDescription.getElementId(), e);
return ok(Notifications.error(e.getMessage()));
}
- String responseMessage = "Stream adapter with id " + adapterStreamDescription.getUri() + " successfully stopped";
-
logger.info(responseMessage);
return ok(Notifications.success(responseMessage));
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index 24eb04a32..173f2e132 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;
import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.shared.annotation.TsModel;
@@ -165,8 +166,9 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
public List<TransformationRuleDescription> getValueRules() {
var tmp = new ArrayList<TransformationRuleDescription>();
rules.forEach(rule -> {
- if(rule instanceof ValueTransformationRuleDescription)
+ if(rule instanceof ValueTransformationRuleDescription && !(rule instanceof AddTimestampRuleDescription)) {
tmp.add(rule);
+ }
});
return tmp;
}