You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/07/28 09:43:47 UTC

[incubator-streampipes] 04/04: [hotfix] Filter transformation rules for timestamp rules

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit bf076736ffc6bf34e9e114ade234052463f394a2
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jul 28 11:43:32 2022 +0200

    [hotfix] Filter transformation rules for timestamp rules
---
 .../container/worker/management/AdapterWorkerManagement.java |  6 ++----
 .../connect/container/worker/rest/AdapterWorkerResource.java | 12 ++++++++----
 .../model/connect/adapter/AdapterDescription.java            |  4 +++-
 3 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 1c8d6065c..f250ac7b3 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -80,11 +80,9 @@ public class AdapterWorkerManagement {
 
         IAdapter<?> adapter = RunningAdapterInstances.INSTANCE.removeAdapter(elementId);
 
-        if (adapter == null) {
-            throw new AdapterException("Adapter with id " + elementId + " was not found in this container and cannot be stopped.");
+        if (adapter != null) {
+            adapter.stopAdapter();
         }
-
-        adapter.stopAdapter();
     }
 
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index e4235906f..6fd89be1b 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -82,15 +82,19 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
     @Produces(MediaType.APPLICATION_JSON)
     public Response stopStreamAdapter(AdapterStreamDescription adapterStreamDescription) {
 
+        String responseMessage;
         try {
-            adapterManagement.stopStreamAdapter(adapterStreamDescription);
+            if (adapterStreamDescription.isRunning()) {
+                adapterManagement.stopStreamAdapter(adapterStreamDescription);
+                responseMessage = "Stream adapter with id " + adapterStreamDescription.getElementId() + " successfully stopped";
+            } else {
+                responseMessage = "Stream adapter with id " + adapterStreamDescription.getElementId() + " seems not to be running";
+            }
         } catch (AdapterException e) {
-            logger.error("Error while stopping adapter with id " + adapterStreamDescription.getUri(), e);
+            logger.error("Error while stopping adapter with id " + adapterStreamDescription.getElementId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
 
-        String responseMessage = "Stream adapter with id " + adapterStreamDescription.getUri() + " successfully stopped";
-
         logger.info(responseMessage);
         return ok(Notifications.success(responseMessage));
     }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index 24eb04a32..173f2e132 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.shared.annotation.TsModel;
@@ -165,8 +166,9 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
     public List<TransformationRuleDescription> getValueRules() {
         var tmp = new ArrayList<TransformationRuleDescription>();
         rules.forEach(rule -> {
-            if(rule instanceof ValueTransformationRuleDescription)
+            if(rule instanceof ValueTransformationRuleDescription && !(rule instanceof AddTimestampRuleDescription)) {
                 tmp.add(rule);
+            }
         });
         return tmp;
     }