You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/06 13:58:33 UTC

[streampipes] 01/01: [#1491] WIP

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1491
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 098c01cbbd9fd68073e11cc243558f0618f0bc04
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 6 15:58:12 2023 +0200

    [#1491] WIP
---
 .../api/{FileApi.java => FetchFileHandler.java}    | 29 +++-------------------
 .../org/apache/streampipes/client/api/FileApi.java | 10 +++++++-
 .../extensions/management/CacheFilesUtils.java     | 29 ++++++++++++++++++++++
 .../management/util/CachedFetchFileHandler.java    | 29 ++++++++++++++++++++++
 .../connect/iiot/ConnectAdapterIiotInit.java       | 12 +++++++++
 .../connect/iiot/utils/FileProtocolUtils.java      | 22 ++++++++++++++++
 .../extensions/all/jvm/AllExtensionsInit.java      |  9 +++++++
 .../CsvMetadataEnrichmentController.java           | 10 +++++++-
 8 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
similarity index 50%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
index e2c00a859..aaa3bfe14 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
@@ -19,32 +19,9 @@
 package org.apache.streampipes.client.api;
 
 import org.apache.streampipes.client.http.BinaryGetRequest;
-import org.apache.streampipes.client.model.StreamPipesClientConfig;
-import org.apache.streampipes.client.util.StreamPipesApiPath;
 
+public interface FetchFileHandler {
 
-public class FileApi extends AbstractClientApi {
-
-  public FileApi(StreamPipesClientConfig clientConfig) {
-    super(clientConfig);
-  }
-
-  public byte[] getFileContent(String filename) {
-    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
-  }
-
-  public String getFileContentAsString(String filename) {
-    return new String(getFileContent(filename));
-  }
-
-  public void writeToFile(String file, String fileLocation) {
-    new BinaryGetRequest(clientConfig, getBaseResourcePath(file), null)
-        .writeToFile(fileLocation);
-  }
-
-  protected StreamPipesApiPath getBaseResourcePath(String fileName) {
-    return StreamPipesApiPath.fromBaseApiPath()
-        .addToPath("files")
-        .addToPath(fileName);
-  }
+  byte[] getFileContent(String filename,
+                        BinaryGetRequest apiRequest);
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
index e2c00a859..6b4a9b891 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
@@ -30,7 +30,15 @@ public class FileApi extends AbstractClientApi {
   }
 
   public byte[] getFileContent(String filename) {
-    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
+    return getRequest(filename).executeRequest();
+  }
+
+  public byte[] getFileContent(String filename, FetchFileHandler handler) {
+    return handler.getFileContent(filename, getRequest(filename));
+  }
+
+  private BinaryGetRequest getRequest(String filename) {
+    return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null);
   }
 
   public String getFileContentAsString(String filename) {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
new file mode 100644
index 000000000..b37d83d63
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management;
+
+import org.apache.streampipes.client.StreamPipesClient;
+
+public class CacheFilesUtils {
+
+  public static byte[] getFileContent(String filename, StreamPipesClient client) {
+    return new byte[0];
+  }
+
+}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
new file mode 100644
index 000000000..fc72e74e3
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.util;
+
+import org.apache.streampipes.client.api.FetchFileHandler;
+import org.apache.streampipes.client.http.BinaryGetRequest;
+
+public class CachedFetchFileHandler implements FetchFileHandler {
+  @Override
+  public byte[] getFileContent(String filename, BinaryGetRequest apiRequest) {
+    return new byte[0];
+  }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index fab0dde2e..a784bf133 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -34,11 +34,15 @@ import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
 import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 
+import jakarta.annotation.PreDestroy;
+
 public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
+
   public static void main(String[] args) {
     new ConnectAdapterIiotInit().init();
   }
@@ -67,4 +71,12 @@ public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
         .registerAdapter(new TubeMQProtocol())
         .build();
   }
+
+
+  @PreDestroy
+  @Override
+  public void onExit() {
+    super.onExit();
+    FileProtocolUtils.deleteServiceStorageDir();
+  }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
index e7d697c58..3657ae74d 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -22,6 +22,10 @@ import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -30,6 +34,8 @@ import java.io.InputStream;
 
 public class FileProtocolUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FileProtocolUtils.class);
+
   public static InputStream getFileInputStream(String selectedFilename) throws FileNotFoundException {
     if (!isFilePresent(selectedFilename)) {
       try {
@@ -42,6 +48,21 @@ public class FileProtocolUtils {
     return new FileInputStream(makeFileLoc(selectedFilename));
   }
 
+  public static void deleteServiceStorageDir() {
+    var directoryLocation = makeServiceStorageDir();
+    File directory = new File(directoryLocation);
+    if (directory.exists()) {
+      try {
+        FileUtils.deleteDirectory(directory);
+        LOG.info("Directory %s was removed sucessfully".formatted(directoryLocation));
+      } catch (IOException e) {
+        LOG.error("There was a problem removing the directory %s".formatted(directoryLocation), e);
+      }
+    } else {
+      LOG.info("Tried to delete directory %s but it did not exist".formatted(directoryLocation));
+    }
+  }
+
   private static boolean isFilePresent(String selectedFilename) {
     File file = new File(makeFileLoc(selectedFilename));
     return file.exists();
@@ -65,6 +86,7 @@ public class FileProtocolUtils {
         + "service";
   }
 
+
   private static String makeFileLoc(String filename) {
     return makeServiceStorageDir() + File.separator + filename;
   }
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index db9332361..ada515923 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.extensions.all.jvm;
 
 import org.apache.streampipes.connect.iiot.ConnectAdapterIiotInit;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -31,6 +32,7 @@ import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
 import org.apache.streampipes.pe.jvm.AllPipelineElementsInit;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 
+import jakarta.annotation.PreDestroy;
 
 public class AllExtensionsInit extends ExtensionsModelSubmitter {
 
@@ -58,4 +60,11 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
             new SpNatsProtocolFactory())
         .build();
   }
+
+  @PreDestroy
+  @Override
+  public void onExit() {
+    super.onExit();
+    FileProtocolUtils.deleteServiceStorageDir();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
index a04050bdb..161933511 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
@@ -22,7 +22,9 @@ import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
+import org.apache.streampipes.extensions.management.CacheFilesUtils;
 import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.extensions.management.util.CachedFetchFileHandler;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -183,7 +185,13 @@ public class CsvMetadataEnrichmentController
 
   private String getFileContents(AbstractParameterExtractor<?> extractor) {
     String filename = extractor.selectedFilename(CSV_FILE_KEY);
-    return getStreamPipesClientInstance().fileApi().getFileContentAsString(filename);
+    var handler = new CachedFetchFileHandler();
+    var fileUtils = CacheFilesUtils.getFileContent(filename, getStreamPipesClientInstance());
+
+
+    var result = getStreamPipesClientInstance().fileApi().getFileContent(filename, handler);
+
+    return "";
   }
 
   private StreamPipesClient getStreamPipesClientInstance() {