You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/06 13:58:33 UTC
[streampipes] 01/01: [#1491] WIP
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1491
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 098c01cbbd9fd68073e11cc243558f0618f0bc04
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 6 15:58:12 2023 +0200
[#1491] WIP
---
.../api/{FileApi.java => FetchFileHandler.java} | 29 +++-------------------
.../org/apache/streampipes/client/api/FileApi.java | 10 +++++++-
.../extensions/management/CacheFilesUtils.java | 29 ++++++++++++++++++++++
.../management/util/CachedFetchFileHandler.java | 29 ++++++++++++++++++++++
.../connect/iiot/ConnectAdapterIiotInit.java | 12 +++++++++
.../connect/iiot/utils/FileProtocolUtils.java | 22 ++++++++++++++++
.../extensions/all/jvm/AllExtensionsInit.java | 9 +++++++
.../CsvMetadataEnrichmentController.java | 10 +++++++-
8 files changed, 122 insertions(+), 28 deletions(-)
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
similarity index 50%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
index e2c00a859..aaa3bfe14 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FetchFileHandler.java
@@ -19,32 +19,9 @@
package org.apache.streampipes.client.api;
import org.apache.streampipes.client.http.BinaryGetRequest;
-import org.apache.streampipes.client.model.StreamPipesClientConfig;
-import org.apache.streampipes.client.util.StreamPipesApiPath;
+public interface FetchFileHandler {
-public class FileApi extends AbstractClientApi {
-
- public FileApi(StreamPipesClientConfig clientConfig) {
- super(clientConfig);
- }
-
- public byte[] getFileContent(String filename) {
- return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
- }
-
- public String getFileContentAsString(String filename) {
- return new String(getFileContent(filename));
- }
-
- public void writeToFile(String file, String fileLocation) {
- new BinaryGetRequest(clientConfig, getBaseResourcePath(file), null)
- .writeToFile(fileLocation);
- }
-
- protected StreamPipesApiPath getBaseResourcePath(String fileName) {
- return StreamPipesApiPath.fromBaseApiPath()
- .addToPath("files")
- .addToPath(fileName);
- }
+ byte[] getFileContent(String filename,
+ BinaryGetRequest apiRequest);
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
index e2c00a859..6b4a9b891 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/FileApi.java
@@ -30,7 +30,15 @@ public class FileApi extends AbstractClientApi {
}
public byte[] getFileContent(String filename) {
- return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null).executeRequest();
+ return getRequest(filename).executeRequest();
+ }
+
+ public byte[] getFileContent(String filename, FetchFileHandler handler) {
+ return handler.getFileContent(filename, getRequest(filename));
+ }
+
+ private BinaryGetRequest getRequest(String filename) {
+ return new BinaryGetRequest(clientConfig, getBaseResourcePath(filename), null);
}
public String getFileContentAsString(String filename) {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
new file mode 100644
index 000000000..b37d83d63
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/CacheFilesUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management;
+
+import org.apache.streampipes.client.StreamPipesClient;
+
+public class CacheFilesUtils {
+
+ public static byte[] getFileContent(String filename, StreamPipesClient client) {
+ return new byte[0];
+ }
+
+}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
new file mode 100644
index 000000000..fc72e74e3
--- /dev/null
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/CachedFetchFileHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.util;
+
+import org.apache.streampipes.client.api.FetchFileHandler;
+import org.apache.streampipes.client.http.BinaryGetRequest;
+
+public class CachedFetchFileHandler implements FetchFileHandler {
+ @Override
+ public byte[] getFileContent(String filename, BinaryGetRequest apiRequest) {
+ return new byte[0];
+ }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index fab0dde2e..a784bf133 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -34,11 +34,15 @@ import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
+import jakarta.annotation.PreDestroy;
+
public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
+
public static void main(String[] args) {
new ConnectAdapterIiotInit().init();
}
@@ -67,4 +71,12 @@ public class ConnectAdapterIiotInit extends ExtensionsModelSubmitter {
.registerAdapter(new TubeMQProtocol())
.build();
}
+
+
+ @PreDestroy
+ @Override
+ public void onExit() {
+ super.onExit();
+ FileProtocolUtils.deleteServiceStorageDir();
+ }
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
index e7d697c58..3657ae74d 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -22,6 +22,10 @@ import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -30,6 +34,8 @@ import java.io.InputStream;
public class FileProtocolUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(FileProtocolUtils.class);
+
public static InputStream getFileInputStream(String selectedFilename) throws FileNotFoundException {
if (!isFilePresent(selectedFilename)) {
try {
@@ -42,6 +48,21 @@ public class FileProtocolUtils {
return new FileInputStream(makeFileLoc(selectedFilename));
}
+ public static void deleteServiceStorageDir() {
+ var directoryLocation = makeServiceStorageDir();
+ File directory = new File(directoryLocation);
+ if (directory.exists()) {
+ try {
+ FileUtils.deleteDirectory(directory);
+ LOG.info("Directory %s was removed sucessfully".formatted(directoryLocation));
+ } catch (IOException e) {
+ LOG.error("There was a problem removing the directory %s".formatted(directoryLocation), e);
+ }
+ } else {
+ LOG.info("Tried to delete directory %s but it did not exist".formatted(directoryLocation));
+ }
+ }
+
private static boolean isFilePresent(String selectedFilename) {
File file = new File(makeFileLoc(selectedFilename));
return file.exists();
@@ -65,6 +86,7 @@ public class FileProtocolUtils {
+ "service";
}
+
private static String makeFileLoc(String filename) {
return makeServiceStorageDir() + File.separator + filename;
}
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index db9332361..ada515923 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.extensions.all.jvm;
import org.apache.streampipes.connect.iiot.ConnectAdapterIiotInit;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -31,6 +32,7 @@ import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.pe.jvm.AllPipelineElementsInit;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
+import jakarta.annotation.PreDestroy;
public class AllExtensionsInit extends ExtensionsModelSubmitter {
@@ -58,4 +60,11 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
new SpNatsProtocolFactory())
.build();
}
+
+ @PreDestroy
+ @Override
+ public void onExit() {
+ super.onExit();
+ FileProtocolUtils.deleteServiceStorageDir();
+ }
}
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
index a04050bdb..161933511 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentController.java
@@ -22,7 +22,9 @@ import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
+import org.apache.streampipes.extensions.management.CacheFilesUtils;
import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.extensions.management.util.CachedFetchFileHandler;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -183,7 +185,13 @@ public class CsvMetadataEnrichmentController
private String getFileContents(AbstractParameterExtractor<?> extractor) {
String filename = extractor.selectedFilename(CSV_FILE_KEY);
- return getStreamPipesClientInstance().fileApi().getFileContentAsString(filename);
+ var handler = new CachedFetchFileHandler();
+ var fileUtils = CacheFilesUtils.getFileContent(filename, getStreamPipesClientInstance());
+
+
+ var result = getStreamPipesClientInstance().fileApi().getFileContent(filename, handler);
+
+ return "";
}
private StreamPipesClient getStreamPipesClientInstance() {