You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/23 22:00:41 UTC

[incubator-streampipes] branch dev updated (62260ca -> 7538b9a)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 62260ca  [STREAMPIPES-372] Add pipeline health check and restore feature
     new 184bc66  [STREAMPIPES-374] Improve management of SecretStaticProperty values
     new 7538b9a  [STREAMPIPES-373] Modify expected input stream for dashboard sinks

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/util/AdapterEncryptionService.java      |  33 +-----
 .../manager/execution/http/PipelineExecutor.java   |  43 +++-----
 .../execution/http/PipelineStorageService.java     |  88 +++++++---------
 .../manager/health/PipelineHealthCheck.java        |   4 +-
 .../streampipes/manager/secret/ISecretHandler.java |   7 +-
 .../manager/secret/SecretDecrypter.java            |  32 +++---
 .../manager/secret/SecretEncrypter.java            |  34 +++---
 .../streampipes/manager/secret/SecretProvider.java |  19 ++--
 .../SecretService.java}                            |  43 ++++----
 .../streampipes/manager/secret/SecretVisitor.java  | 116 +++++++++++++++++++++
 .../rest/impl/dashboard/VisualizablePipeline.java  |  21 +++-
 11 files changed, 258 insertions(+), 182 deletions(-)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/messaging/IMessageListener.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/ISecretHandler.java (81%)
 copy streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/NotificationListener.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretDecrypter.java (54%)
 copy streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/NotificationListener.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretEncrypter.java (53%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/ElementNotFoundException.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretProvider.java (69%)
 copy streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/{matching/GroundingSelector.java => secret/SecretService.java} (51%)
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretVisitor.java

[incubator-streampipes] 02/02: [STREAMPIPES-373] Modify expected input stream for dashboard sinks

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 7538b9a850464df6196c1bff74f51030c4ae4409
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 00:00:14 2021 +0200

    [STREAMPIPES-373] Modify expected input stream for dashboard sinks
---
 .../execution/http/PipelineStorageService.java     | 70 +++++++++++-----------
 .../manager/health/PipelineHealthCheck.java        |  4 +-
 .../rest/impl/dashboard/VisualizablePipeline.java  | 21 ++++++-
 3 files changed, 56 insertions(+), 39 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
index d59a4b3..b80ec80 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
@@ -33,48 +33,48 @@ import java.util.stream.Collectors;
 
 public class PipelineStorageService {
 
-    private Pipeline pipeline;
+  private Pipeline pipeline;
 
-    public PipelineStorageService(Pipeline pipeline) {
-        this.pipeline = pipeline;
-    }
+  public PipelineStorageService(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
 
-    public void updatePipeline() {
-     encryptSecrets(pipeline);
-     StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
-    }
+  public void updatePipeline() {
+    preparePipeline();
+    StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+  }
 
-    public void addPipeline() {
-        preparePipeline();
-        StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
-    }
+  public void addPipeline() {
+    preparePipeline();
+    StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
+  }
 
-    private void preparePipeline() {
-        PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-        InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
-        List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
-        encryptSecrets(graphs);
+  private void preparePipeline() {
+    PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
+    InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
+    List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
+    encryptSecrets(graphs);
 
-        List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
-        List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
+    List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
+    List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
 
-        pipeline.setSepas(sepas);
-        pipeline.setActions(secs);
-    }
+    pipeline.setSepas(sepas);
+    pipeline.setActions(secs);
+  }
 
-    private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
-    }
+  private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+    SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
+  }
 
-    private void encryptSecrets(Pipeline pipeline) {
-        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
-    }
+  private void encryptSecrets(Pipeline pipeline) {
+    SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
+  }
 
-    private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
-        return graphs
-                .stream()
-                .filter(clazz::isInstance)
-                .map(clazz::cast)
-                .collect(Collectors.toList());
-    }
+  private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+    return graphs
+            .stream()
+            .filter(clazz::isInstance)
+            .map(clazz::cast)
+            .collect(Collectors.toList());
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 0bbde50..245110e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -33,8 +33,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.manager.operations.Operations.updatePipeline;
-
 public class PipelineHealthCheck implements Runnable {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
@@ -77,7 +75,7 @@ public class PipelineHealthCheck implements Runnable {
                 LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName());
               }
               pipeline.setPipelineNotifications(pipelineNotifications);
-              updatePipeline(pipeline);
+              StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
             }
           }
         });
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
index d26881d..648d454 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
@@ -80,7 +80,7 @@ public class VisualizablePipeline extends AbstractRestResource {
                 visualizablePipeline.setPipelineName(pipeline.getName());
                 visualizablePipeline.setVisualizationName(extractVisualizationName(sink));
                 visualizablePipeline.setSchema(sink.getInputStreams().get(0).getEventSchema());
-                visualizablePipeline.setTopic(sink.getElementId().substring(sink.getElementId().lastIndexOf(Slash) + 1));
+                visualizablePipeline.setTopic(makeTopic(sink));
 
                 visualizablePipelines.add(visualizablePipeline);
               });
@@ -89,6 +89,25 @@ public class VisualizablePipeline extends AbstractRestResource {
      return visualizablePipelines;
   }
 
+  private String makeTopic(DataSinkInvocation sink) {
+    return extractInputTopic(sink) + "-" + normalize(extractVisualizationName(sink));
+  }
+
+  private String extractInputTopic(DataSinkInvocation sink) {
+    return sink
+            .getInputStreams()
+            .get(0)
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
+  }
+
+  private String normalize(String visualizationName) {
+    return visualizationName.replaceAll(" ", "").toLowerCase();
+  }
+
+
   private String extractVisualizationName(DataSinkInvocation sink) {
     return sink.getStaticProperties()
             .stream()

[incubator-streampipes] 01/02: [STREAMPIPES-374] Improve management of SecretStaticProperty values

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 184bc66ee3c0209ec7d50ba624047a9ac6029e20
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun May 23 16:45:08 2021 +0200

    [STREAMPIPES-374] Improve management of SecretStaticProperty values
---
 .../master/util/AdapterEncryptionService.java      |  33 +-----
 .../manager/execution/http/PipelineExecutor.java   |  43 +++-----
 .../execution/http/PipelineStorageService.java     |  34 ++----
 .../streampipes/manager/secret/ISecretHandler.java |  26 +++++
 .../manager/secret/SecretDecrypter.java            |  45 ++++++++
 .../manager/secret/SecretEncrypter.java            |  43 ++++++++
 .../streampipes/manager/secret/SecretProvider.java |  30 ++++++
 .../streampipes/manager/secret/SecretService.java  |  52 +++++++++
 .../streampipes/manager/secret/SecretVisitor.java  | 116 +++++++++++++++++++++
 9 files changed, 338 insertions(+), 84 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
index b795bee..984cf58 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/AdapterEncryptionService.java
@@ -17,13 +17,11 @@
  */
 package org.apache.streampipes.connect.container.master.util;
 
+import org.apache.streampipes.manager.secret.SecretProvider;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
 
-import java.security.GeneralSecurityException;
 import java.util.List;
 
 public class AdapterEncryptionService {
@@ -58,35 +56,10 @@ public class AdapterEncryptionService {
   }
 
   private void encrypt(List<StaticProperty> staticProperties) {
-    staticProperties
-            .stream()
-            .filter(SecretStaticProperty.class::isInstance)
-            .forEach(secret -> {
-              if (!((SecretStaticProperty) secret).getEncrypted()) {
-                try {
-                  String encrypted = CredentialsManager.encrypt(ad.getUserName(),
-                          ((SecretStaticProperty) secret).getValue());
-                  ((SecretStaticProperty) secret).setValue(encrypted);
-                  ((SecretStaticProperty) secret).setEncrypted(true);
-                } catch (GeneralSecurityException e) {
-                  e.printStackTrace();
-                }
-              }
-            });
+    SecretProvider.getEncryptionService(ad.getUserName()).applyConfig(staticProperties);
   }
 
   private void decrypt(List<StaticProperty> staticProperties) {
-    staticProperties.stream()
-            .filter(SecretStaticProperty.class::isInstance)
-            .forEach(sp -> {
-              try {
-                String decrypted = CredentialsManager.decrypt(ad.getUserName(),
-                        ((SecretStaticProperty) sp).getValue());
-                ((SecretStaticProperty) sp).setValue(decrypted);
-                ((SecretStaticProperty) sp).setEncrypted(false);
-              } catch (GeneralSecurityException e) {
-                e.printStackTrace();
-              }
-            });
+    SecretProvider.getDecryptionService(ad.getUserName()).applyConfig(staticProperties);
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index c8d3e68..5775753 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.manager.execution.http;
 
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.secret.SecretProvider;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -30,13 +31,10 @@ import org.apache.streampipes.model.message.PipelineStatusMessageType;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
 import org.lightcouch.DocumentConflictException;
 
-import java.security.GeneralSecurityException;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -79,14 +77,16 @@ public class PipelineExecutor {
     graphs.addAll(sepas);
     graphs.addAll(secs);
 
-    List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
+    decryptSecrets(graphs);
 
-    graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
+    //graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
 
     PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(), decryptedGraphs, dataSets)
+            pipeline.getName(), graphs, dataSets)
             .invokeGraphs();
 
+    encryptSecrets(graphs);
+
     if (status.isSuccess()) {
       storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
 
@@ -110,31 +110,12 @@ public class PipelineExecutor {
             .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
   }
 
-  private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
-    graphs.stream().map(g -> {
-      if (g instanceof DataProcessorInvocation) {
-        return new DataProcessorInvocation((DataProcessorInvocation) g);
-      } else {
-        return new DataSinkInvocation((DataSinkInvocation) g);
-      }
-    }).forEach(g -> {
-      g.getStaticProperties()
-              .stream()
-              .filter(SecretStaticProperty.class::isInstance)
-              .forEach(sp -> {
-                try {
-                  String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
-                          ((SecretStaticProperty) sp).getValue());
-                  ((SecretStaticProperty) sp).setValue(decrypted);
-                  ((SecretStaticProperty) sp).setEncrypted(false);
-                } catch (GeneralSecurityException e) {
-                  e.printStackTrace();
-                }
-              });
-      decryptedGraphs.add(g);
-    });
-    return decryptedGraphs;
+  private void decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+    SecretProvider.getDecryptionService(pipeline.getCreatedByUser()).apply(graphs);
+  }
+
+  private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+    SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
   }
 
   public PipelineOperationStatus stopPipeline() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
index e0fc07e..d59a4b3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
@@ -21,15 +21,13 @@ package org.apache.streampipes.manager.execution.http;
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
 import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.manager.secret.SecretProvider;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
 
-import java.security.GeneralSecurityException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -42,7 +40,7 @@ public class PipelineStorageService {
     }
 
     public void updatePipeline() {
-     preparePipeline();
+     encryptSecrets(pipeline);
      StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
     }
 
@@ -54,7 +52,8 @@ public class PipelineStorageService {
     private void preparePipeline() {
         PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
         InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
-        List<InvocableStreamPipesEntity> graphs = encryptSecrets(builder.buildGraphs());
+        List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
+        encryptSecrets(graphs);
 
         List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
         List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
@@ -63,23 +62,12 @@ public class PipelineStorageService {
         pipeline.setActions(secs);
     }
 
-    private List<InvocableStreamPipesEntity> encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-        graphs.forEach(g -> g.getStaticProperties()
-                .stream()
-                .filter(SecretStaticProperty.class::isInstance)
-                .forEach(secret -> {
-                    if (!((SecretStaticProperty) secret).getEncrypted()) {
-                        try {
-                            String encrypted = CredentialsManager.encrypt(pipeline.getCreatedByUser(),
-                                    ((SecretStaticProperty) secret).getValue());
-                            ((SecretStaticProperty) secret).setValue(encrypted);
-                            ((SecretStaticProperty) secret).setEncrypted(true);
-                        } catch (GeneralSecurityException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }));
-        return graphs;
+    private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
+    }
+
+    private void encryptSecrets(Pipeline pipeline) {
+        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
     }
 
     private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/ISecretHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/ISecretHandler.java
new file mode 100644
index 0000000..2537ce3
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/ISecretHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+public interface ISecretHandler {
+
+  String apply(String username, String extractedValue);
+
+  boolean shouldApply(boolean encrypted);
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretDecrypter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretDecrypter.java
new file mode 100644
index 0000000..9685632
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretDecrypter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+
+import org.apache.streampipes.user.management.encryption.CredentialsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.GeneralSecurityException;
+
+public class SecretDecrypter implements ISecretHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SecretDecrypter.class);
+
+  @Override
+  public String apply(String username, String extractedValue) {
+    try {
+      return CredentialsManager.decrypt(username, extractedValue);
+    } catch (GeneralSecurityException e) {
+      LOG.error("Could not decrypt value, returning original value");
+      return extractedValue;
+    }
+  }
+
+  @Override
+  public boolean shouldApply(boolean encrypted) {
+    return encrypted;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretEncrypter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretEncrypter.java
new file mode 100644
index 0000000..877858d
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretEncrypter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+import org.apache.streampipes.user.management.encryption.CredentialsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.GeneralSecurityException;
+
+public class SecretEncrypter implements ISecretHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SecretEncrypter.class);
+  @Override
+  public String apply(String username, String extractedValue) {
+    try {
+      return CredentialsManager.encrypt(username, extractedValue);
+    } catch (GeneralSecurityException e) {
+      LOG.error("Could not encrypt value, returning original value");
+      return extractedValue;
+    }
+  }
+
+  @Override
+  public boolean shouldApply(boolean encrypted) {
+    return !encrypted;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretProvider.java
new file mode 100644
index 0000000..9676677
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+public class SecretProvider {
+
+  public static SecretService getEncryptionService(String username) {
+    return new SecretService(username, new SecretEncrypter()) {
+    };
+  }
+
+  public static SecretService getDecryptionService(String username) {
+    return new SecretService(username, new SecretDecrypter());
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretService.java
new file mode 100644
index 0000000..4f4223b
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+
+import java.util.List;
+
+public class SecretService {
+
+  private SecretVisitor visitor;
+
+  public SecretService(String username,
+                       ISecretHandler secretHandler) {
+    this.visitor = new SecretVisitor(username, secretHandler);
+  }
+
+  public void apply(Pipeline pipeline) {
+    pipeline.getSepas().forEach(this::apply);
+    pipeline.getActions().forEach(this::apply);
+  }
+
+  public void apply(List<InvocableStreamPipesEntity> graphs) {
+    graphs.forEach(this::apply);
+  }
+
+  public void apply(InvocableStreamPipesEntity graph) {
+    applyConfig(graph.getStaticProperties());
+  }
+
+  public void applyConfig(List<StaticProperty> staticProperties) {
+    staticProperties.forEach(sp -> sp.accept(visitor));
+  }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretVisitor.java
new file mode 100644
index 0000000..de51d4e
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/secret/SecretVisitor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.secret;
+
+import org.apache.streampipes.model.staticproperty.*;
+
+public class SecretVisitor implements StaticPropertyVisitor {
+
+  private String username;
+  private ISecretHandler secretHandler;
+
+  public SecretVisitor(String username,
+                       ISecretHandler secretHandler) {
+    this.username = username;
+    this.secretHandler = secretHandler;
+  }
+
+  @Override
+  public void visit(AnyStaticProperty property) {
+
+  }
+
+  @Override
+  public void visit(CodeInputStaticProperty codeInputStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(CollectionStaticProperty collectionStaticProperty) {
+    collectionStaticProperty.getMembers().forEach(sp -> sp.accept(this));
+  }
+
+  @Override
+  public void visit(ColorPickerStaticProperty colorPickerStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(DomainStaticProperty domainStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(FileStaticProperty fileStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(FreeTextStaticProperty freeTextStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(MappingPropertyNary mappingPropertyNary) {
+
+  }
+
+  @Override
+  public void visit(MappingPropertyUnary mappingPropertyUnary) {
+
+  }
+
+  @Override
+  public void visit(MatchingStaticProperty matchingStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(OneOfStaticProperty oneOfStaticProperty) {
+
+  }
+
+  @Override
+  public void visit(SecretStaticProperty secretStaticProperty) {
+    if (secretHandler.shouldApply(secretStaticProperty.getEncrypted())) {
+      String newValue = secretHandler.apply(username, secretStaticProperty.getValue());
+      secretStaticProperty.setValue(newValue);
+      secretStaticProperty.setEncrypted(!secretStaticProperty.getEncrypted());
+    }
+  }
+
+  @Override
+  public void visit(StaticPropertyAlternative staticPropertyAlternative) {
+    staticPropertyAlternative.getStaticProperty().accept(this);
+  }
+
+  @Override
+  public void visit(StaticPropertyAlternatives staticPropertyAlternatives) {
+    staticPropertyAlternatives.getAlternatives().forEach(sp -> sp.accept(this));
+  }
+
+  @Override
+  public void visit(StaticPropertyGroup staticPropertyGroup) {
+    staticPropertyGroup.getStaticProperties().forEach(sp -> sp.accept(this));
+  }
+
+  @Override
+  public void visit(RemoteOneOfStaticProperty remoteOneOfStaticProperty) {
+
+  }
+}