You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/06 13:58:32 UTC

[streampipes] branch SP-1491 created (now 098c01cbb)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch SP-1491
in repository https://gitbox.apache.org/repos/asf/streampipes.git


      at 098c01cbb [#1491] WIP

This branch includes the following new commits:

     new 098c01cbb [#1491] WIP

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[streampipes] 01/01: [#1491] WIP

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1491
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 098c01cbbd9fd68073e11cc243558f0618f0bc04
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 6 15:58:12 2023 +0200

    [#1491] WIP
---
 .../api/{FileApi.java => FetchFileHandler.java}    | 29 +++-------------------
 .../org/apache/streampipes/client/api/FileApi.java | 10 +++++++-
 .../extensions/management/CacheFilesUtils.java     | 29 ++++++++++++++++++++++
 .../management/util/CachedFetchFileHandler.java    | 29 ++++++++++++++++++++++
 .../connect/iiot/ConnectAdapterIiotInit.java       | 12 +++++++++
 .../connect/iiot/utils/FileProtocolUtils.java      | 22 ++++++++++++++++
 .../extensions/all/jvm/AllExtensionsInit.java      |  9 +++++++
 .../CsvMetadataEnrichmentController.java           | 10 +++++++-
 8 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
similarity index 50%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
index e2c00a859..aaa3bfe14 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
@@ -19,32 +19,9 @@
 package org.apache.streampipes.client.api;
 
 import org.apache.streampipes.client.http.BinaryGetRequest;
-import org.apache.streampipes.client.model.StreamPipesClientConfig;
-import org.apache.streampipes.client.util.StreamPipesApiPath;
 
+public interface FetchFileHandler {
 
-public class FileApi extends AbstractClientApi {
-
-  public FileApi(StreamPipesClientConfig clientConfig) {
-    super(clientConfig);
-  }
-
-  public byte[] getFileContent(String filename) {
-    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
-  }
-
-  public String getFileContentAsString(String filename) {
-    return new String(getFileContent(filename));
-  }
-
-  public void writeToFile(String file, String fileLocation) {
-    new BinaryGetRequest(clientConfig, getBaseResourcePath(file), null)
-        .writeToFile(fileLocation);
-  }
-
-  protected StreamPipesApiPath getBaseResourcePath(String fileName) {
-    return StreamPipesApiPath.fromBaseApiPath()
-        .addToPath("files")
-        .addToPath(fileName);
-  }
+  byte[] getFileContent(String filename,
+                        BinaryGetRequest apiRequest);
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
index e2c00a859..6b4a9b891 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
@@ -30,7 +30,15 @@ public class FileApi extends AbstractClientApi {
   }
 
   public byte[] getFileContent(String filename) {
-    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
+    return getRequest(filename).executeRequest();
+  }
+
+  public byte[] getFileContent(String filename, FetchFileHandler handler) {
+    return handler.getFileContent(filename, getRequest(filename));
+  }
+
+  private BinaryGetRequest getRequest(String filename) {
+    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null);
   }
 
   public String getFileContentAsString(String filename) {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
new file mode 100644
index 000000000..b37d83d63
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management;
+
+import org.apache.streampipes.client.StreamPipesClient;
+
+public class CacheFilesUtils {
+
+  public static byte[] getFileContent(String filename, StreamPipesClient client) {
+    return new byte[0];
+  }
+
+}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
new file mode 100644
index 000000000..fc72e74e3
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.util;
+
+import org.apache.streampipes.client.api.FetchFileHandler;
+import org.apache.streampipes.client.http.BinaryGetRequest;
+
+public class CachedFetchFileHandler implements FetchFileHandler {
+  @Override
+  public byte[] getFileContent(String filename, BinaryGetRequest apiRequest) {
+    return new byte[0];
+  }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index fab0dde2e..a784bf133 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -34,11 +34,15 @@ import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
 import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 
+import jakarta.annotation.PreDestroy;
+
 public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
+
   public static void main(String[] args) {
     new ConnectAdapterIiotInit().init();
   }
@@ -67,4 +71,12 @@ public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
         .registerAdapter(new TubeMQProtocol())
         .build();
   }
+
+
+  @PreDestroy
+  @Override
+  public void onExit() {
+    super.onExit();
+    FileProtocolUtils.deleteServiceStorageDir();
+  }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
index e7d697c58..3657ae74d 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -22,6 +22,10 @@ import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -30,6 +34,8 @@ import java.io.InputStream;
 
 public class FileProtocolUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FileProtocolUtils.class);
+
   public static InputStream getFileInputStream(String selectedFilename) throws FileNotFoundException {
     if (!isFilePresent(selectedFilename)) {
       try {
@@ -42,6 +48,21 @@ public class FileProtocolUtils {
     return new FileInputStream(makeFileLoc(selectedFilename));
   }
 
+  public static void deleteServiceStorageDir() {
+    var directoryLocation = makeServiceStorageDir();
+    File directory = new File(directoryLocation);
+    if (directory.exists()) {
+      try {
+        FileUtils.deleteDirectory(directory);
+        LOG.info("Directory %s was removed sucessfully".formatted(directoryLocation));
+      } catch (IOException e) {
+        LOG.error("There was a problem removing the directory %s".formatted(directoryLocation), e);
+      }
+    } else {
+      LOG.info("Tried to delete directory %s but it did not exist".formatted(directoryLocation));
+    }
+  }
+
   private static boolean isFilePresent(String selectedFilename) {
     File file = new File(makeFileLoc(selectedFilename));
     return file.exists();
@@ -65,6 +86,7 @@ public class FileProtocolUtils {
         + "service";
   }
 
+
   private static String makeFileLoc(String filename) {
     return makeServiceStorageDir() + File.separator + filename;
   }
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index db9332361..ada515923 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.extensions.all.jvm;
 
 import org.apache.streampipes.connect.iiot.ConnectAdapterIiotInit;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -31,6 +32,7 @@ import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
 import org.apache.streampipes.pe.jvm.AllPipelineElementsInit;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 
+import jakarta.annotation.PreDestroy;
 
 public class AllExtensionsInit extends ExtensionsModelSubmitter {
 
@@ -58,4 +60,11 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
             new SpNatsProtocolFactory())
         .build();
   }
+
+  @PreDestroy
+  @Override
+  public void onExit() {
+    super.onExit();
+    FileProtocolUtils.deleteServiceStorageDir();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
index a04050bdb..161933511 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
@@ -22,7 +22,9 @@ import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
+import org.apache.streampipes.extensions.management.CacheFilesUtils;
 import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.extensions.management.util.CachedFetchFileHandler;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -183,7 +185,13 @@ public class CsvMetadataEnrichmentController
 
   private String getFileContents(AbstractParameterExtractor<?> extractor) {
     String filename = extractor.selectedFilename(CSV_FILE_KEY);
-    return getStreamPipesClientInstance().fileApi().getFileContentAsString(filename);
+    var handler = new CachedFetchFileHandler();
+    var fileUtils = CacheFilesUtils.getFileContent(filename, getStreamPipesClientInstance());
+
+
+    var result = getStreamPipesClientInstance().fileApi().getFileContent(filename, handler);
+
+    return "";
   }
 
   private StreamPipesClient getStreamPipesClientInstance() {