You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2020/10/27 13:38:31 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1350 Support
serializers in MergeContent
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 674fad7 MINIFICPP-1350 Support serializers in MergeContent
674fad7 is described below
commit 674fad7fdc082840bfd17bececb4fa5ac7678468
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Oct 27 14:36:38 2020 +0100
MINIFICPP-1350 Support serializers in MergeContent
This closes #900
Signed-off-by: Marton Szasz <sz...@gmail.com>
---
extensions/bustache/ApplyTemplate.h | 2 +-
extensions/civetweb/processors/ListenHTTP.cpp | 2 +-
extensions/civetweb/processors/ListenHTTP.h | 4 +-
extensions/http-curl/client/HTTPCallback.h | 2 +-
.../http-curl/tests/unit/InvokeHTTPTests.cpp | 2 +-
extensions/jni/jvm/JniReferenceObjects.h | 4 +-
extensions/libarchive/CompressContent.h | 10 +-
extensions/libarchive/FocusArchiveEntry.cpp | 2 +-
extensions/libarchive/FocusArchiveEntry.h | 2 +-
extensions/libarchive/MergeContent.cpp | 120 +++++++------
extensions/libarchive/MergeContent.h | 199 ++++++++++-----------
extensions/libarchive/UnfocusArchiveEntry.cpp | 2 +-
extensions/libarchive/UnfocusArchiveEntry.h | 2 +-
extensions/librdkafka/PublishKafka.cpp | 2 +-
extensions/mqtt/processors/ConsumeMQTT.h | 2 +-
extensions/mqtt/processors/ConvertJSONAck.h | 2 +-
extensions/mqtt/processors/PublishMQTT.h | 2 +-
extensions/opc/include/fetchopc.h | 2 +-
extensions/opc/include/putopc.h | 2 +-
extensions/opc/src/putopc.cpp | 2 +-
extensions/opencv/CaptureRTSPFrame.h | 2 +-
extensions/opencv/FrameIO.h | 4 +-
.../SourceInitiatedSubscriptionListener.cpp | 2 +-
.../SourceInitiatedSubscriptionListener.h | 2 +-
extensions/script/lua/LuaProcessSession.h | 4 +-
extensions/script/python/PyProcessSession.h | 4 +-
extensions/sensors/SensorBase.h | 2 +-
extensions/sftp/processors/FetchSFTP.cpp | 2 +-
extensions/sftp/processors/FetchSFTP.h | 2 +-
extensions/sftp/processors/PutSFTP.cpp | 2 +-
extensions/sftp/processors/PutSFTP.h | 2 +-
extensions/sql/data/WriteCallback.h | 2 +-
extensions/sqlite/ExecuteSQL.cpp | 2 +-
extensions/sqlite/ExecuteSQL.h | 2 +-
extensions/sqlite/PutSQL.cpp | 2 +-
extensions/sqlite/PutSQL.h | 2 +-
.../processors/ExecuteProcess.h | 2 +-
.../standard-processors/processors/ExtractText.cpp | 2 +-
.../standard-processors/processors/ExtractText.h | 2 +-
.../processors/GenerateFlowFile.h | 2 +-
extensions/standard-processors/processors/GetTCP.h | 2 +-
.../standard-processors/processors/HashContent.cpp | 2 +-
.../standard-processors/processors/HashContent.h | 2 +-
.../standard-processors/processors/ListenSyslog.h | 2 +-
.../standard-processors/processors/LogAttribute.h | 2 +-
.../standard-processors/processors/PutFile.cpp | 2 +-
.../standard-processors/processors/PutFile.h | 2 +-
.../standard-processors/processors/TailFile.cpp | 4 +-
extensions/tensorflow/TFApplyGraph.cpp | 6 +-
extensions/tensorflow/TFApplyGraph.h | 6 +-
extensions/tensorflow/TFConvertImageToTensor.cpp | 4 +-
extensions/tensorflow/TFConvertImageToTensor.h | 4 +-
extensions/tensorflow/TFExtractTopLabels.cpp | 4 +-
extensions/tensorflow/TFExtractTopLabels.h | 4 +-
extensions/usb-camera/GetUSBCamera.cpp | 4 +-
extensions/usb-camera/GetUSBCamera.h | 4 +-
.../CollectorInitiatedSubscription.cpp | 2 +-
.../windows-event-log/ConsumeWindowsEventLog.cpp | 2 +-
libminifi/CMakeLists.txt | 2 +-
libminifi/include/FlowFileRecord.h | 16 +-
libminifi/include/core/ProcessSession.h | 2 +-
.../include/core/ProcessSessionReadCallback.h | 2 +-
libminifi/include/io/StreamPipe.h | 110 ++++++++++++
.../include/serialization/FlowFileSerializer.h | 39 ++--
.../include/serialization/FlowFileV3Serializer.h | 27 +--
.../include/serialization/PayloadSerializer.h | 23 +--
libminifi/include/sitetosite/SiteToSiteClient.h | 4 +-
libminifi/include/utils/ByteArrayCallback.h | 6 +-
libminifi/include/utils/FileOutputCallback.h | 2 +-
libminifi/src/core/ProcessSession.cpp | 8 +-
libminifi/src/core/ProcessSessionReadCallback.cpp | 2 +-
.../src/serialization/FlowFileV3Serializer.cpp | 113 ++++++++++++
.../src/serialization/PayloadSerializer.cpp | 26 +--
libminifi/src/utils/ByteArrayCallback.cpp | 4 +-
libminifi/src/utils/FileOutputCallback.cpp | 2 +-
libminifi/test/BufferReader.h | 2 +-
.../test/archive-tests/CompressContentTests.cpp | 2 +-
libminifi/test/archive-tests/MergeFileTests.cpp | 99 +++++++++-
libminifi/test/unit/FlowFileSerializationTests.cpp | 85 +++++++++
79 files changed, 716 insertions(+), 331 deletions(-)
diff --git a/extensions/bustache/ApplyTemplate.h b/extensions/bustache/ApplyTemplate.h
index 739d02a..e743ca8 100644
--- a/extensions/bustache/ApplyTemplate.h
+++ b/extensions/bustache/ApplyTemplate.h
@@ -61,7 +61,7 @@ class ApplyTemplate : public core::Processor {
class WriteCallback : public OutputStreamCallback {
public:
WriteCallback(const std::string &templateFile, const std::shared_ptr<core::FlowFile> &flow_file);
- int64_t process(std::shared_ptr<io::BaseStream> stream);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 19befa4..39b7e8e 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -503,7 +503,7 @@ ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> reque
: request_content_(std::move(request_content)) {
}
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ListenHTTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write(const_cast<uint8_t*>(request_content_->getBuffer()), request_content_->size());
}
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index c774b5e..1904e20 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -129,7 +129,7 @@ class ListenHTTP : public core::Processor {
explicit ResponseBodyReadCallback(std::string *out_str)
: out_str_(out_str) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
out_str_->resize(stream->size());
uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
gsl::narrow<int>(stream->size()));
@@ -149,7 +149,7 @@ class ListenHTTP : public core::Processor {
class WriteCallback : public OutputStreamCallback {
public:
WriteCallback(std::unique_ptr<io::BufferStream>);
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::unique_ptr<io::BufferStream> request_content_;
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 5415979..827de10 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -78,7 +78,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
seekInner(lock, pos);
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
std::vector<char> vec;
if (stream->size() > 0) {
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 4639f3e..14cd7f0 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -150,7 +150,7 @@ class CallBack : public minifi::OutputStreamCallback {
}
virtual ~CallBack() {
}
- virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+ virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
// leaving the typo for posterity sake
std::string st = "we're gnna write some test stuff";
return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 2e9205b..7f50744 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -103,7 +103,7 @@ class JniByteOutStream : public minifi::OutputStreamCallback {
}
virtual ~JniByteOutStream() = default;
- virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+ virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
return stream->write((uint8_t*) bytes_, length_);
}
private:
@@ -126,7 +126,7 @@ class JniByteInputStream : public minifi::InputStreamCallback {
if (buffer_)
delete[] buffer_;
}
- int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
stream_ = stream;
return 0;
}
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 66541de..8cda1d6 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -85,7 +85,7 @@ public:
flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
}
~ReadCallbackCompress() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
uint8_t buffer[4096U];
int64_t ret = 0;
uint64_t read_size = 0;
@@ -130,7 +130,7 @@ public:
origin_offset_ = flow_->getOffset();
}
~ReadCallbackDecompress() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
read_size_ = 0;
stream->seek(offset_);
int readRet = stream->read(buffer_, sizeof(buffer_));
@@ -208,7 +208,7 @@ public:
archive_read_free(arch);
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
struct archive *arch;
int r;
@@ -361,7 +361,7 @@ public:
std::shared_ptr<core::ProcessSession> session_;
bool success_{false};
- int64_t process(std::shared_ptr<io::BaseStream> outputStream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& outputStream) override {
class ReadCallback : public InputStreamCallback {
public:
ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream)
@@ -369,7 +369,7 @@ public:
, outputStream_(std::move(outputStream)) {
}
- int64_t process(std::shared_ptr<io::BaseStream> inputStream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override {
std::vector<uint8_t> buffer(16 * 1024U);
int64_t read_size = 0;
while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index ccce67c..ebe1721 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -166,7 +166,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
return read;
}
-int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
auto inputArchive = archive_read_new();
struct archive_entry *entry;
int64_t nlen = 0;
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 8603f9d..e475c75 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -71,7 +71,7 @@ class FocusArchiveEntry : public core::Processor {
public:
explicit ReadCallback(core::Processor*, fileutils::FileManager *file_man, ArchiveMetadata *archiveMetadata);
~ReadCallback();
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
bool isRunning() {return proc_->isRunning();}
private:
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 1b195c2..e702eb5 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -34,6 +34,8 @@
#include "utils/GeneralUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
+#include "serialization/PayloadSerializer.h"
+#include "serialization/FlowFileV3Serializer.h"
namespace org {
namespace apache {
@@ -49,7 +51,11 @@ core::Property MergeContent::MergeStrategy(
core::Property MergeContent::MergeFormat(
core::PropertyBuilder::createProperty("Merge Format")
->withDescription("Merge Format")
- ->withAllowableValues<std::string>({merge_content_options::MERGE_FORMAT_CONCAT_VALUE, merge_content_options::MERGE_FORMAT_TAR_VALUE, merge_content_options::MERGE_FORMAT_ZIP_VALUE})
+ ->withAllowableValues<std::string>({
+ merge_content_options::MERGE_FORMAT_CONCAT_VALUE,
+ merge_content_options::MERGE_FORMAT_TAR_VALUE,
+ merge_content_options::MERGE_FORMAT_ZIP_VALUE,
+ merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE})
->withDefaultValue(merge_content_options::MERGE_FORMAT_CONCAT_VALUE)->build());
core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
core::Property MergeContent::DelimiterStrategy(
@@ -73,9 +79,6 @@ core::Property MergeContent::AttributeStrategy(
->withAllowableValues<std::string>({merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON, merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE})
->withDefaultValue(merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)->build());
core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
-const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
-const char *TarMerge::mimeType = "application/tar";
-const char *ZipMerge::mimeType = "application/zip";
void MergeContent::initialize() {
// Set the supported properties
@@ -118,35 +121,17 @@ std::string MergeContent::readContent(std::string path) {
}
void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- std::string value;
BinFiles::onSchedule(context, sessionFactory);
- if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
- mergeStrategy_ = value;
- }
- if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
- mergeFormat_ = value;
- }
- if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
- correlationAttributeName_ = value;
- }
- if (context->getProperty(DelimiterStrategy.getName(), value) && !value.empty()) {
- delimiterStrategy_ = value;
- }
- if (context->getProperty(Header.getName(), value) && !value.empty()) {
- header_ = value;
- }
- if (context->getProperty(Footer.getName(), value) && !value.empty()) {
- footer_ = value;
- }
- if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
- demarcator_ = value;
- }
- if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
- }
- if (context->getProperty(AttributeStrategy.getName(), value) && !value.empty()) {
- attributeStrategy_ = value;
- }
+
+ context->getProperty(MergeStrategy.getName(), mergeStrategy_);
+ context->getProperty(MergeFormat.getName(), mergeFormat_);
+ context->getProperty(CorrelationAttributeName.getName(), correlationAttributeName_);
+ context->getProperty(DelimiterStrategy.getName(), delimiterStrategy_);
+ context->getProperty(Header.getName(), header_);
+ context->getProperty(Footer.getName(), footer_);
+ context->getProperty(Demarcator.getName(), demarcator_);
+ context->getProperty(KeepPath.getName(), keepPath_);
+ context->getProperty(AttributeStrategy.getName(), attributeStrategy_);
validatePropertyOptions();
@@ -155,6 +140,19 @@ void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessio
}
logger_->log_debug("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStrategy_, mergeFormat_, correlationAttributeName_, delimiterStrategy_);
logger_->log_debug("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
+
+ if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
+ if (!header_.empty()) {
+ logger_->log_warn("Header property only works with the Binary Concatenation format, value [%s] is ignored", header_);
+ }
+ if (!footer_.empty()) {
+ logger_->log_warn("Footer property only works with the Binary Concatenation format, value [%s] is ignored", footer_);
+ }
+ if (!demarcator_.empty()) {
+ logger_->log_warn("Demarcator property only works with the Binary Concatenation format, value [%s] is ignored", demarcator_);
+ }
+ }
+
if (delimiterStrategy_ == merge_content_options::DELIMITER_STRATEGY_FILENAME) {
if (!header_.empty()) {
headerContent_ = readContent(header_);
@@ -182,7 +180,8 @@ void MergeContent::validatePropertyOptions() {
if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE &&
mergeFormat_ != merge_content_options::MERGE_FORMAT_TAR_VALUE &&
- mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
+ mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE &&
+ mergeFormat_ != merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
logger_->log_error("Merge format not supported %s", mergeFormat_);
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge format: " + mergeFormat_);
}
@@ -298,20 +297,36 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
return false;
}
+ auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+ return session->read(ff, cb);
+ };
+
+ const char* mimeType;
std::unique_ptr<MergeBin> mergeBin;
- if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE)
- mergeBin = utils::make_unique<BinaryConcatenationMerge>();
- else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE)
+ std::unique_ptr<minifi::FlowFileSerializer> serializer = utils::make_unique<PayloadSerializer>(flowFileReader);
+ if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
+ mergeBin = utils::make_unique<BinaryConcatenationMerge>(headerContent_, footerContent_, demarcatorContent_);
+ mimeType = "application/octet-stream";
+ } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
+ // disregard header, demarcator, footer
+ mergeBin = utils::make_unique<BinaryConcatenationMerge>("", "", "");
+ serializer = utils::make_unique<FlowFileV3Serializer>(flowFileReader);
+ mimeType = "application/flowfile-v3";
+ } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
mergeBin = utils::make_unique<TarMerge>();
- else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE)
+ mimeType = "application/tar";
+ } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
mergeBin = utils::make_unique<ZipMerge>();
- else {
+ mimeType = "application/zip";
+ } else {
logger_->log_error("Merge format not supported %s", mergeFormat_);
return false;
}
+ std::shared_ptr<core::FlowFile> mergeFlow;
try {
- mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_, demarcatorContent_, merge_flow);
+ mergeBin->merge(context, session, bin->getFlowFile(), *serializer, merge_flow);
+ session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
} catch (...) {
logger_->log_error("Merge Content merge catch exception");
return false;
@@ -329,12 +344,15 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
return true;
}
-void BinaryConcatenationMerge::merge(core::ProcessContext*, core::ProcessSession *session,
- std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator,
- const std::shared_ptr<core::FlowFile> &merge_flow) {
- BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
+BinaryConcatenationMerge::BinaryConcatenationMerge(const std::string &header, const std::string& footer, const std::string &demarcator)
+ : header_(header),
+ footer_(footer),
+ demarcator_(demarcator) {}
+
+void BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+ BinaryConcatenationMerge::WriteCallback callback(header_, footer_, demarcator_, flows, serializer);
session->write(merge_flow, &callback);
- session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
std::string fileName;
if (flows.size() == 1) {
flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
@@ -345,11 +363,10 @@ void BinaryConcatenationMerge::merge(core::ProcessContext*, core::ProcessSession
session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
}
-void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
- std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) {
- ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, session);
+void TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+ ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, serializer);
session->write(merge_flow, &callback);
- session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
std::string fileName;
merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
if (flows.size() == 1) {
@@ -363,11 +380,10 @@ void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::
}
}
-void ZipMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
- std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) {
- ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, session);
+void ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+ ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, serializer);
session->write(merge_flow, &callback);
- session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
std::string fileName;
merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
if (flows.size() == 1) {
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index 99cf457..cfa067a 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -25,6 +25,7 @@
#include "archive_entry.h"
#include "archive.h"
#include "core/logging/LoggerConfiguration.h"
+#include "serialization/FlowFileSerializer.h"
namespace org {
namespace apache {
@@ -38,11 +39,8 @@ constexpr const char *MERGE_STRATEGY_BIN_PACK = "Bin-Packing Algorithm";
constexpr const char *MERGE_STRATEGY_DEFRAGMENT = "Defragment";
constexpr const char *MERGE_FORMAT_TAR_VALUE = "TAR";
constexpr const char *MERGE_FORMAT_ZIP_VALUE = "ZIP";
-constexpr const char *MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
-constexpr const char *MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = "FlowFile Stream, v2";
-constexpr const char *MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile Tar, v1";
constexpr const char *MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation";
-constexpr const char *MERGE_FORMAT_AVRO_VALUE = "Avro";
+constexpr const char* MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
constexpr const char *DELIMITER_STRATEGY_FILENAME = "Filename";
constexpr const char *DELIMITER_STRATEGY_TEXT = "Text";
constexpr const char *ATTRIBUTE_STRATEGY_KEEP_COMMON = "Keep Only Common Attributes";
@@ -52,63 +50,33 @@ constexpr const char *ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE = "Keep All Unique Attr
// MergeBin Class
class MergeBin {
-public:
-
+ public:
virtual ~MergeBin() = default;
- virtual std::string getMergedContentType() = 0;
// merge the flows in the bin
virtual void merge(core::ProcessContext *context, core::ProcessSession *session,
- std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator,
- const std::shared_ptr<core::FlowFile> &flowFile) = 0;
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
};
// BinaryConcatenationMerge Class
class BinaryConcatenationMerge : public MergeBin {
-public:
- static const char *mimeType;
- std::string getMergedContentType() override {
- return mimeType;
- }
- void merge(
- core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
- std::string &header, std::string &footer, std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
- // Nest Callback Class for read stream
- class ReadCallback : public InputStreamCallback {
- public:
- ReadCallback(uint64_t size, std::shared_ptr<io::BaseStream> stream)
- : buffer_size_(size), stream_(stream) {
- }
- ~ReadCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- uint8_t buffer[4096U];
- int64_t ret = 0;
- uint64_t read_size = 0;
- while (read_size < buffer_size_) {
- int readRet = stream->read(buffer, sizeof(buffer));
- if (readRet > 0) {
- ret += stream_->write(buffer, readRet);
- read_size += readRet;
- } else {
- break;
- }
- }
- return ret;
- }
- uint64_t buffer_size_;
- std::shared_ptr<io::BaseStream> stream_;
- };
+ public:
+ BinaryConcatenationMerge(const std::string& header, const std::string& footer, const std::string& demarcator);
+
+ void merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile);
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
- public:
- WriteCallback(std::string &header, std::string &footer, std::string &demarcator, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
- header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), session_(session) {
+ public:
+ WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
+ header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) {
}
std::string &header_;
std::string &footer_;
std::string &demarcator_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
- core::ProcessSession *session_;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ FlowFileSerializer& serializer_;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (!header_.empty()) {
int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), header_.size());
@@ -124,9 +92,10 @@ public:
return len;
ret += len;
}
- ReadCallback readCb(flow->getSize(), stream);
- session_->read(flow, &readCb);
- ret += flow->getSize();
+ int len = serializer_.serialize(flow, stream);
+ if (len < 0)
+ return len;
+ ret += len;
isFirst = false;
}
if (!footer_.empty()) {
@@ -138,46 +107,54 @@ public:
return ret;
}
};
+
+ private:
+ std::string header_;
+ std::string footer_;
+ std::string demarcator_;
};
// Archive Class
class ArchiveMerge {
-public:
- // Nest Callback Class for read stream
- class ReadCallback: public InputStreamCallback {
- public:
- ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) :
- buffer_size_(size), arch_(arch), entry_(entry) {
- }
- ~ReadCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- uint8_t buffer[4096U];
- int64_t ret = 0;
- uint64_t read_size = 0;
- ret = archive_write_header(arch_, entry_);
- while (read_size < buffer_size_) {
- int readRet = stream->read(buffer, sizeof(buffer));
- if (readRet > 0) {
- ret += archive_write_data(arch_, buffer, readRet);
- read_size += readRet;
+ public:
+ class ArchiveWriter : public io::OutputStream {
+ public:
+ ArchiveWriter(struct archive *arch, struct archive_entry *entry) : arch_(arch), entry_(entry) {}
+ int write(const uint8_t* data, int size) override {
+ if (!header_emitted_) {
+ if (archive_write_header(arch_, entry_) != ARCHIVE_OK) {
+ return -1;
+ }
+ header_emitted_ = true;
+ }
+ int totalWrote = 0;
+ int remaining = size;
+ while (remaining > 0) {
+ int ret = archive_write_data(arch_, data + totalWrote, remaining);
+ if (ret < 0) {
+ return ret;
}
- else {
+ if (ret == 0) {
break;
}
+ totalWrote += ret;
+ remaining -= ret;
}
- return ret;
+ return totalWrote;
}
- uint64_t buffer_size_;
+
+ private:
struct archive *arch_;
struct archive_entry *entry_;
+ bool header_emitted_{false};
};
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
- public:
- WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
- merge_type_(merge_type), flows_(flows), session_(session),
- logger_(logging::LoggerFactory<ArchiveMerge>::getLogger()) {
+ public:
+ WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
+ : merge_type_(merge_type), flows_(flows), serializer_(serializer),
+ logger_(logging::LoggerFactory<ArchiveMerge>::getLogger()) {
size_ = 0;
stream_ = nullptr;
}
@@ -185,20 +162,33 @@ public:
std::string merge_type_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
- core::ProcessSession *session_;
std::shared_ptr<io::BaseStream> stream_;
- int64_t size_;
+ size_t size_;
std::shared_ptr<logging::Logger> logger_;
+ FlowFileSerializer& serializer_;
static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) {
WriteCallback *callback = (WriteCallback *) context;
- la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size);
- if (ret > 0)
- callback->size_ += (int64_t) ret;
- return ret;
+ uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff));
+ la_ssize_t totalWrote = 0;
+ size_t remaining = size;
+ while (remaining > 0) {
+ la_ssize_t ret = callback->stream_->write(data + totalWrote, remaining);
+ if (ret < 0) {
+ // libarchive expects us to return -1 on error
+ return -1;
+ }
+ if (ret == 0) {
+ break;
+ }
+ callback->size_ += ret;
+ totalWrote += ret;
+ remaining -= ret;
+ }
+ return totalWrote;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
struct archive *arch;
arch = archive_write_new();
@@ -232,8 +222,10 @@ public:
}
}
}
- ReadCallback readCb(flow->getSize(), arch, entry);
- session_->read(flow, &readCb);
+ int ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry));
+ if (ret < 0) {
+ return ret;
+ }
archive_entry_free(entry);
}
@@ -246,33 +238,26 @@ public:
// TarMerge Class
class TarMerge: public ArchiveMerge, public MergeBin {
-public:
- static const char *mimeType;
- void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer,
- std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
- std::string getMergedContentType() override {
- return mimeType;
- }
+ public:
+ void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
+ FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};
// ZipMerge Class
class ZipMerge: public ArchiveMerge, public MergeBin {
-public:
- static const char *mimeType;
- void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer,
- std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
- std::string getMergedContentType() override {
- return mimeType;
- }
+ public:
+ void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
+ FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};
class AttributeMerger {
-public:
+ public:
explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: flows_(flows) {}
void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow);
virtual ~AttributeMerger() = default;
-protected:
+
+ protected:
std::map<std::string, std::string> getMergedAttributes();
virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0;
@@ -280,27 +265,29 @@ protected:
};
class KeepOnlyCommonAttributesMerger: public AttributeMerger {
-public:
+ public:
explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
-protected:
+
+ protected:
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
};
class KeepAllUniqueAttributesMerger: public AttributeMerger {
-public:
+ public:
explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
-protected:
+
+ protected:
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
-private:
+ private:
std::vector<std::string> removed_attributes_;
};
// MergeContent Class
class MergeContent : public processors::BinFiles {
-public:
+ public:
// Constructor
/*!
* Create a new processor
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index 16aec91..1569c34 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -158,7 +158,7 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *
return data->stream->write(const_cast<uint8_t*>(ui_buffer), length);
}
-int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
auto outputArchive = archive_write_new();
int64_t nlen = 0;
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index 9ed5809..904455d 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -70,7 +70,7 @@ class UnfocusArchiveEntry : public core::Processor {
class WriteCallback : public OutputStreamCallback {
public:
explicit WriteCallback(ArchiveMetadata *archiveMetadata);
- int64_t process(std::shared_ptr<io::BaseStream> stream);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
private:
//! Logger
std::shared_ptr<Logger> logger_;
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 5b007a3..30c8847 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -377,7 +377,7 @@ class ReadCallback : public InputStreamCallback {
ReadCallback(const ReadCallback&) = delete;
ReadCallback& operator=(ReadCallback) = delete;
- int64_t process(const std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
std::vector<unsigned char> buffer;
buffer.resize(max_seg_size_);
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index 18c0b33..b4e39a1 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -79,7 +79,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
status_ = 0;
}
MQTTClient_message *message_;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen);
if (len < 0)
status_ = -1;
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index c99cb1b..dc90cc3 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -73,7 +73,7 @@ class ConvertJSONAck : public ConvertBase {
public:
ReadCallback() = default;
~ReadCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (nullptr == stream)
return 0;
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index a3f4805..7925596 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -75,7 +75,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
read_size_ = 0;
}
~ReadCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
if (flow_size_ < max_seg_size_)
max_seg_size_ = flow_size_;
std::vector<unsigned char> buffer;
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index e8ff90c..cb60f31 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -82,7 +82,7 @@ protected:
WriteCallback(std::string&& data)
: data_(data) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
}
};
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index 8dfe61c..ec58de2 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -80,7 +80,7 @@ class PutOPCProcessor : public BaseOPCProcessor {
class ReadCallback : public InputStreamCallback {
public:
ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
const std::vector<uint8_t>& getContent() const { return buf_; }
private:
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 2214489..36f8739 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -421,7 +421,7 @@ namespace processors {
}
}
- int64_t PutOPCProcessor::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t PutOPCProcessor::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
buf_.clear();
buf_.resize(stream->size());
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index 5253f23..9a3f500 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -69,7 +69,7 @@ class CaptureRTSPFrame : public core::Processor {
}
~CaptureRTSPFrameWriteCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
int64_t ret = 0;
imencode(image_encoding_, image_mat_, image_buf_);
ret = stream->write(image_buf_.data(), image_buf_.size());
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
index 3359be9..bd3669e 100644
--- a/extensions/opencv/FrameIO.h
+++ b/extensions/opencv/FrameIO.h
@@ -32,7 +32,7 @@ class FrameWriteCallback : public OutputStreamCallback {
}
~FrameWriteCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
int64_t ret = 0;
imencode(image_encoding_, image_mat_, image_buf_);
ret = stream->write(image_buf_.data(), image_buf_.size());
@@ -52,7 +52,7 @@ class FrameReadCallback : public InputStreamCallback {
}
~FrameReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
int64_t ret = 0;
image_buf_.resize(stream->getSize());
ret = stream->read(image_buf_.data(), static_cast<int>(stream->getSize()));
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 8e697a0..9f14ea5 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -604,7 +604,7 @@ SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char*
: text_(text) {
}
-int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
}
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index d884e3e..9ba0990 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -93,7 +93,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
class WriteCallback : public OutputStreamCallback {
public:
explicit WriteCallback(char* text);
- int64_t process(std::shared_ptr<io::BaseStream> stream);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
private:
char* text_;
diff --git a/extensions/script/lua/LuaProcessSession.h b/extensions/script/lua/LuaProcessSession.h
index f0cbb4f..67d558b 100644
--- a/extensions/script/lua/LuaProcessSession.h
+++ b/extensions/script/lua/LuaProcessSession.h
@@ -59,7 +59,7 @@ class LuaProcessSession {
lua_callback_ = input_stream_callback;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
auto lua_stream = std::make_shared<LuaBaseStream>(stream);
sol::function callback = lua_callback_["process"];
return callback(lua_callback_, lua_stream);
@@ -75,7 +75,7 @@ class LuaProcessSession {
lua_callback_ = output_stream_callback;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
auto lua_stream = std::make_shared<LuaBaseStream>(stream);
sol::function callback = lua_callback_["process"];
return callback(lua_callback_, lua_stream);
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 00bad31..33f61a3 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -64,7 +64,7 @@ class PyProcessSession {
py_callback_ = input_stream_callback;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
auto py_stream = std::make_shared<PyBaseStream>(stream);
return py_callback_.attr("process")(py_stream).cast<int64_t>();
}
@@ -79,7 +79,7 @@ class PyProcessSession {
py_callback_ = output_stream_callback;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
auto py_stream = std::make_shared<PyBaseStream>(stream);
return py_callback_.attr("process")(py_stream).cast<int64_t>();
}
diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h
index e5c306d..19e7f18 100644
--- a/extensions/sensors/SensorBase.h
+++ b/extensions/sensors/SensorBase.h
@@ -71,7 +71,7 @@ class SensorBase : public core::Processor {
}
char *_data;
uint64_t _dataSize;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (_data && _dataSize > 0)
ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
index e9d05ea..d5bc1f9 100644
--- a/extensions/sftp/processors/FetchSFTP.cpp
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -157,7 +157,7 @@ FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
FetchSFTP::WriteCallback::~WriteCallback() = default;
-int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FetchSFTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
if (!client_.getFile(remote_file_, *stream)) {
throw client_.getLastError();
}
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
index 0b27af0..6beb40a 100644
--- a/extensions/sftp/processors/FetchSFTP.h
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -82,7 +82,7 @@ class FetchSFTP : public SFTPProcessorBase {
WriteCallback(const std::string& remote_file,
utils::SFTPClient& client);
~WriteCallback();
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index fc844a0..e9c1a8f 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -210,7 +210,7 @@ PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
PutSFTP::ReadCallback::~ReadCallback() = default;
-int64_t PutSFTP::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutSFTP::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
if (!client_.putFile(target_path_,
*stream,
conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index 3d2a115..dff0671 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -98,7 +98,7 @@ namespace processors {
utils::SFTPClient& client,
const std::string& conflict_resolution);
~ReadCallback();
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/sql/data/WriteCallback.h b/extensions/sql/data/WriteCallback.h
index ab961ad..5336cd0 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/extensions/sql/data/WriteCallback.h
@@ -33,7 +33,7 @@ public:
: data_(data) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
if (data_.empty())
return 0;
diff --git a/extensions/sqlite/ExecuteSQL.cpp b/extensions/sqlite/ExecuteSQL.cpp
index 1da955d..e42fc4e 100644
--- a/extensions/sqlite/ExecuteSQL.cpp
+++ b/extensions/sqlite/ExecuteSQL.cpp
@@ -172,7 +172,7 @@ void ExecuteSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
}
}
-int64_t ExecuteSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ExecuteSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
sql_->resize(stream->size());
auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
static_cast<int>(stream->size())));
diff --git a/extensions/sqlite/ExecuteSQL.h b/extensions/sqlite/ExecuteSQL.h
index 58a53a3..63b11ec 100644
--- a/extensions/sqlite/ExecuteSQL.h
+++ b/extensions/sqlite/ExecuteSQL.h
@@ -58,7 +58,7 @@ class ExecuteSQL : public core::Processor {
explicit SQLReadCallback(std::shared_ptr<std::string> sql)
: sql_(std::move(sql)) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<std::string> sql_;
diff --git a/extensions/sqlite/PutSQL.cpp b/extensions/sqlite/PutSQL.cpp
index 8694f4e..c9d7bc8 100644
--- a/extensions/sqlite/PutSQL.cpp
+++ b/extensions/sqlite/PutSQL.cpp
@@ -172,7 +172,7 @@ void PutSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
}
}
-int64_t PutSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
sql_->resize(stream->size());
auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
static_cast<int>(stream->size())));
diff --git a/extensions/sqlite/PutSQL.h b/extensions/sqlite/PutSQL.h
index 77d5aab..e071048 100644
--- a/extensions/sqlite/PutSQL.h
+++ b/extensions/sqlite/PutSQL.h
@@ -60,7 +60,7 @@ class PutSQL : public core::Processor {
explicit SQLReadCallback(std::shared_ptr<std::string> sql)
: sql_(std::move(sql)) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<std::string> sql_;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 8c6cdb3..49bf3a7 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -92,7 +92,7 @@ class ExecuteProcess : public core::Processor {
char *_data;
uint64_t _dataSize;
// void process(std::ofstream *stream) {
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (_data && _dataSize > 0)
ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index d2b1b3b..345b84a 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -112,7 +112,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
session->transfer(flowFile, Success);
}
-int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
uint64_t read_size = 0;
bool regex_mode;
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index e7df36f..10c9017 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -76,7 +76,7 @@ class ExtractText : public core::Processor {
public:
ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<logging::Logger> lgr);
~ReadCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
private:
std::shared_ptr<core::FlowFile> flowFile_;
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 23bc1eb..b5d7dfd 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -70,7 +70,7 @@ class GenerateFlowFile : public core::Processor {
WriteCallback(const std::vector<char>& data) : data_(data) { // NOLINT
}
std::vector<char> data_;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (data_.size() > 0)
ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), data_.size());
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index aaad66c..e582d44 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -92,7 +92,7 @@ class DataHandlerCallback : public OutputStreamCallback {
virtual ~DataHandlerCallback() = default;
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write(message_, size_);
}
diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp
index a7ab109..7a4bbf3 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -93,7 +93,7 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
session->transfer(flowFile, Success);
}
-int64_t HashContent::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t HashContent::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
// This throws in case algo is not found, but that's fine
parent_.logger_->log_trace("Searching for %s", parent_.algoName_);
auto algo = HashAlgos.at(parent_.algoName_);
diff --git a/extensions/standard-processors/processors/HashContent.h b/extensions/standard-processors/processors/HashContent.h
index d55a172..1467cfb 100644
--- a/extensions/standard-processors/processors/HashContent.h
+++ b/extensions/standard-processors/processors/HashContent.h
@@ -162,7 +162,7 @@ class HashContent : public core::Processor {
public:
ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
~ReadCallback() {}
- int64_t process(std::shared_ptr<io::BaseStream> stream);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
private:
std::shared_ptr<core::FlowFile> flowFile_;
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index bcf4f0d..fcf24a0 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -134,7 +134,7 @@ class ListenSyslog : public core::Processor {
}
uint8_t *_data;
uint64_t _dataSize;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (_data && _dataSize > 0)
ret = stream->write(_data, _dataSize);
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 190d3d1..d1df602 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -102,7 +102,7 @@ class LogAttribute : public core::Processor {
: logger_(std::move(logger))
, buffer_(size) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
if (buffer_.size() == 0U) {
return 0U;
}
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 5eaa332..bfc9490 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -242,7 +242,7 @@ PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::stri
}
// Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
// Copy file contents into tmp file
write_succeeded_ = false;
size_t size = 0;
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 6c4a0c8..f938483 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -80,7 +80,7 @@ class PutFile : public core::Processor {
public:
ReadCallback(const std::string &tmp_file, const std::string &dest_file);
~ReadCallback() override;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
bool commit();
private:
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 46bd952..547184a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -202,7 +202,7 @@ class FileReaderCallback : public OutputStreamCallback {
openFile(file_name, offset, input_stream_, logger_);
}
- int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
uint64_t num_bytes_written = 0;
@@ -281,7 +281,7 @@ class WholeFileReaderCallback : public OutputStreamCallback {
return checksum_;
}
- int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+ int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
std::array<char, BUFFER_SIZE> buffer;
io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index bc4aac6..4419caa 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -188,7 +188,7 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
}
}
-int64_t TFApplyGraph::GraphReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string graph_proto_buf;
graph_proto_buf.resize(stream->size());
auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]),
@@ -202,7 +202,7 @@ int64_t TFApplyGraph::GraphReadCallback::process(std::shared_ptr<io::BaseStream>
return num_read;
}
-int64_t TFApplyGraph::TensorReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string tensor_proto_buf;
tensor_proto_buf.resize(stream->size());
auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
@@ -216,7 +216,7 @@ int64_t TFApplyGraph::TensorReadCallback::process(std::shared_ptr<io::BaseStream
return num_read;
}
-int64_t TFApplyGraph::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
auto tensor_proto_buf = tensor_proto_->SerializeAsString();
auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFApplyGraph.h b/extensions/tensorflow/TFApplyGraph.h
index 3155b05..446b08d 100644
--- a/extensions/tensorflow/TFApplyGraph.h
+++ b/extensions/tensorflow/TFApplyGraph.h
@@ -64,7 +64,7 @@ class TFApplyGraph : public core::Processor {
: graph_def_(std::move(graph_def)) {
}
~GraphReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<tensorflow::GraphDef> graph_def_;
@@ -76,7 +76,7 @@ class TFApplyGraph : public core::Processor {
: tensor_proto_(std::move(tensor_proto)) {
}
~TensorReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
@@ -88,7 +88,7 @@ class TFApplyGraph : public core::Processor {
: tensor_proto_(std::move(tensor_proto)) {
}
~TensorWriteCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index bdc86e4..5f94548 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -317,7 +317,7 @@ void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContex
}
}
-int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
if (tensor_->AllocatedBytes() < stream->size()) {
throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
}
@@ -332,7 +332,7 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::B
return num_read;
}
-int64_t TFConvertImageToTensor::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFConvertImageToTensor::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
auto tensor_proto_buf = tensor_proto_->SerializeAsString();
auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
index 268a041..dc0a02b 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.h
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -70,7 +70,7 @@ class TFConvertImageToTensor : public core::Processor {
: tensor_(tensor) {
}
~ImageReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
tensorflow::Tensor *tensor_;
@@ -82,7 +82,7 @@ class TFConvertImageToTensor : public core::Processor {
: tensor_proto_(std::move(tensor_proto)) {
}
~TensorWriteCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index dab3071..9bcb067 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -123,7 +123,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
}
}
-int64_t TFExtractTopLabels::LabelsReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t total_read = 0;
std::string label;
uint64_t max_label_len = 65536;
@@ -152,7 +152,7 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(std::shared_ptr<io::Base
return total_read;
}
-int64_t TFExtractTopLabels::TensorReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string tensor_proto_buf;
tensor_proto_buf.resize(stream->size());
auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
diff --git a/extensions/tensorflow/TFExtractTopLabels.h b/extensions/tensorflow/TFExtractTopLabels.h
index 85b0244..7288e84 100644
--- a/extensions/tensorflow/TFExtractTopLabels.h
+++ b/extensions/tensorflow/TFExtractTopLabels.h
@@ -56,7 +56,7 @@ class TFExtractTopLabels : public core::Processor {
: labels_(std::move(labels)) {
}
~LabelsReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<std::vector<std::string>> labels_;
@@ -68,7 +68,7 @@ class TFExtractTopLabels : public core::Processor {
: tensor_proto_(std::move(tensor_proto)) {
}
~TensorReadCallback() override = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index ec2d40f..523da22 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -397,7 +397,7 @@ GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> wri
logger_(logging::LoggerFactory<PNGWriteCallback>::getLogger()) {
}
-int64_t GetUSBCamera::PNGWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::lock_guard<std::mutex> lock(*png_write_mtx_);
logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
@@ -461,7 +461,7 @@ GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
logger_(logging::LoggerFactory<RawWriteCallback>::getLogger()) {
}
-int64_t GetUSBCamera::RawWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t GetUSBCamera::RawWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
logger_->log_info("Writing %d bytes of raw capture data", frame_->data_bytes);
return stream->write(reinterpret_cast<uint8_t *>(frame_->data), frame_->data_bytes);
}
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 0699c7c..38c6884 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -98,7 +98,7 @@ class GetUSBCamera : public core::Processor {
class PNGWriteCallback : public OutputStreamCallback {
public:
PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height);
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
std::shared_ptr<std::mutex> png_write_mtx_;
@@ -113,7 +113,7 @@ class GetUSBCamera : public core::Processor {
class RawWriteCallback : public OutputStreamCallback {
public:
explicit RawWriteCallback(uvc_frame_t *frame);
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
uvc_frame_t *frame_;
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index 651bee4..55f741c 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -633,7 +633,7 @@ int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::Pro
status_ = 0;
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write((uint8_t*)&str_[0], str_.size());
}
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index db0f4b1..4a3d552 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -651,7 +651,7 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
: str_(str) {
}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), str_.size());
}
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 333fb4c..464e71e 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/provenance/*.cpp" "src/ut [...]
# manually add this as it might not yet be present when this executes
list(APPEND SOURCES "src/agent/agent_version.cpp")
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index ed738b4..445af89 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -39,6 +39,7 @@
#include "ResourceClaim.h"
#include "Connection.h"
#include "io/OutputStream.h"
+#include "io/StreamPipe.h"
namespace org {
namespace apache {
@@ -47,21 +48,6 @@ namespace minifi {
#define DEFAULT_FLOWFILE_PATH "."
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-class InputStreamCallback {
- public:
- virtual ~InputStreamCallback() = default;
- // virtual void process(std::ifstream *stream) = 0;
-
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
-};
-class OutputStreamCallback {
- public:
- virtual ~OutputStreamCallback() = default;
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
-};
-
namespace core {
class ProcessSession;
}
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 125439c..c429570 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -93,7 +93,7 @@ class ProcessSession : public ReferenceContainer {
// Remove Flow File
void remove(const std::shared_ptr<core::FlowFile> &flow);
// Execute the given read callback against the content
- void read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
+ int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
// Execute the given write callback against the content
void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
// Execute the given write/append callback against the content
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h b/libminifi/include/core/ProcessSessionReadCallback.h
index 9ef7c6a..00a6f93 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -37,7 +37,7 @@ class ProcessSessionReadCallback : public InputStreamCallback {
ProcessSessionReadCallback(const std::string &tmpFile, const std::string &destFile,
std::shared_ptr<logging::Logger> logger);
~ProcessSessionReadCallback();
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
bool commit();
private:
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
new file mode 100644
index 0000000..b3fafce
--- /dev/null
+++ b/libminifi/include/io/StreamPipe.h
@@ -0,0 +1,110 @@
+/**
+ * @file FlowFileRecord.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include "BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// FlowFile IO Callback functions for input and output
+// throw exception for error
+class InputStreamCallback {
+ public:
+ virtual ~InputStreamCallback() = default;
+
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
+};
+class OutputStreamCallback {
+ public:
+ virtual ~OutputStreamCallback() = default;
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
+};
+
+namespace internal {
+
+inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
+ uint8_t buffer[4096U];
+ int64_t totalTransferred = 0;
+ while (true) {
+ int readRet = src->read(buffer, sizeof(buffer));
+ if (readRet < 0) {
+ return readRet;
+ }
+ if (readRet == 0) {
+ break;
+ }
+ int remaining = readRet;
+ int transferred = 0;
+ while (remaining > 0) {
+ int writeRet = dst->write(buffer + transferred, remaining);
+ // TODO(adebreceni):
+ // write might return 0, e.g. in case of a congested server
+ // what should we return then?
+ // - the number of bytes read or
+ // - the number of bytes wrote
+ if (writeRet < 0) {
+ return writeRet;
+ }
+ transferred += writeRet;
+ remaining -= writeRet;
+ }
+ totalTransferred += transferred;
+ }
+ return totalTransferred;
+}
+
+} // namespace internal
+
+class InputStreamPipe : public InputStreamCallback {
+ public:
+ explicit InputStreamPipe(std::shared_ptr<io::OutputStream> output) : output_(std::move(output)) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ return internal::pipe(stream, output_);
+ }
+
+ private:
+ std::shared_ptr<io::OutputStream> output_;
+};
+
+class OutputStreamPipe : public OutputStreamCallback {
+ public:
+ explicit OutputStreamPipe(std::shared_ptr<io::InputStream> input) : input_(std::move(input)) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ return internal::pipe(input_, stream);
+ }
+
+ private:
+ std::shared_ptr<io::InputStream> input_;
+};
+
+
+
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/FlowFileSerializer.h
similarity index 61%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/FlowFileSerializer.h
index ab961ad..02f21e4 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/FlowFileSerializer.h
@@ -18,29 +18,40 @@
#pragma once
-#include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include <utility>
+#include <functional>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
+namespace io {
+
+class OutputStream;
+
+} /* namespace io */
+
+namespace core {
+
+class FlowFile;
+
+} /* namespace core */
+
+class InputStreamCallback;
+
+class FlowFileSerializer {
+ public:
+ using FlowFileReader = std::function<int(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
-class WriteCallback : public OutputStreamCallback {
-public:
- explicit WriteCallback(const std::string& data)
- : data_(data) {
- }
+ explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {}
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- if (data_.empty())
- return 0;
+ virtual int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0;
- return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
- }
+ virtual ~FlowFileSerializer() = default;
- const std::string& data_;
+ protected:
+ FlowFileReader reader_;
};
} /* namespace minifi */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/FlowFileV3Serializer.h
similarity index 59%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/FlowFileV3Serializer.h
index ab961ad..a6eb2cf 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/FlowFileV3Serializer.h
@@ -18,29 +18,30 @@
#pragma once
+#include <limits>
#include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include "io/OutputStream.h"
+#include "FlowFileSerializer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-class WriteCallback : public OutputStreamCallback {
-public:
- explicit WriteCallback(const std::string& data)
- : data_(data) {
- }
+class FlowFileV3Serializer : public FlowFileSerializer {
+ static constexpr uint8_t MAGIC_HEADER[] = {'N', 'i', 'F', 'i', 'F', 'F', '3'};
+
+ static constexpr uint16_t MAX_2_BYTE_VALUE = (std::numeric_limits<uint16_t>::max)();
+
+ static int writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out);
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- if (data_.empty())
- return 0;
+ static int writeString(const std::string& str, const std::shared_ptr<io::OutputStream>& out);
- return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
- }
+ public:
+ using FlowFileSerializer::FlowFileSerializer;
- const std::string& data_;
+ int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
};
} /* namespace minifi */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/PayloadSerializer.h
similarity index 70%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/PayloadSerializer.h
index ab961ad..ed26e85 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/PayloadSerializer.h
@@ -18,29 +18,20 @@
#pragma once
-#include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include "io/OutputStream.h"
+#include "FlowFileSerializer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-class WriteCallback : public OutputStreamCallback {
-public:
- explicit WriteCallback(const std::string& data)
- : data_(data) {
- }
-
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- if (data_.empty())
- return 0;
-
- return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
- }
+class PayloadSerializer : public FlowFileSerializer {
+ public:
+ using FlowFileSerializer::FlowFileSerializer;
- const std::string& data_;
+ int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
};
} /* namespace minifi */
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 18d2285..df967ee 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -290,7 +290,7 @@ class WriteCallback : public OutputStreamCallback {
}
DataPacket *_packet;
// void process(std::ofstream *stream) {
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
uint8_t buffer[16384];
uint64_t len = _packet->_size;
uint64_t total = 0;
@@ -316,7 +316,7 @@ class ReadCallback : public InputStreamCallback {
: _packet(packet) {
}
DataPacket *_packet;
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
_packet->_size = 0;
uint8_t buffer[8192] = { 0 };
int readSize;
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 1639bf9..cbcdb4c 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -42,7 +42,7 @@ class ByteInputCallBack : public InputStreamCallback {
virtual ~ByteInputCallBack() = default;
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
stream->seek(0);
if (stream->size() > 0) {
@@ -107,7 +107,7 @@ class ByteOutputCallback : public OutputStreamCallback {
close();
}
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
virtual const std::vector<char> to_string();
@@ -155,7 +155,7 @@ class StreamOutputCallback : public ByteOutputCallback {
virtual void write(char *data, size_t size);
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
};
} // namespace utils
diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h
index dfc906f..820ebe8 100644
--- a/libminifi/include/utils/FileOutputCallback.h
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -49,7 +49,7 @@ class FileOutputCallback : public ByteOutputCallback {
virtual ~FileOutputCallback() = default;
- int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
const std::vector<char> to_string() override;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 2abf4cf..5309c8e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -281,7 +281,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
}
}
-void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
+int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
try {
std::shared_ptr<ResourceClaim> claim = nullptr;
@@ -289,7 +289,7 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
// No existed claim for read, we throw exception
logger_->log_debug("For %s, no resource claim but size is %d", flow->getUUIDStr(), flow->getSize());
if (flow->getSize() == 0) {
- return;
+ return 0;
}
throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
@@ -304,9 +304,11 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
stream->seek(flow->getOffset());
- if (callback->process(stream) < 0) {
+ int ret = callback->process(stream);
+ if (ret < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
+ return ret;
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index d0009bd..ff30f07 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -41,7 +41,7 @@ ProcessSessionReadCallback::ProcessSessionReadCallback(const std::string &tmpFil
}
// Copy the entire file contents to the temporary file
-int64_t ProcessSessionReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
// Copy file contents into tmp file
_writeSucceeded = false;
size_t size = 0;
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp
new file mode 100644
index 0000000..34a1087
--- /dev/null
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "serialization/FlowFileV3Serializer.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+constexpr uint8_t FlowFileV3Serializer::MAGIC_HEADER[];
+
+int FlowFileV3Serializer::writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out) {
+ if (length < MAX_2_BYTE_VALUE) {
+ return out->write(static_cast<uint16_t>(length));
+ }
+ int sum = 0;
+ int ret;
+ ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ ret = out->write(static_cast<uint32_t>(length));
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ return sum;
+}
+
+int FlowFileV3Serializer::writeString(const std::string &str, const std::shared_ptr<io::OutputStream> &out) {
+ int sum = 0;
+ int ret;
+ ret = writeLength(str.length(), out);
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ ret = out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length());
+ if (ret < 0) {
+ return ret;
+ }
+ if (ret != str.length()) {
+ return -1;
+ }
+ sum += ret;
+ return sum;
+}
+
+int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
+ int sum = 0;
+ int ret;
+ ret = out->write(const_cast<uint8_t*>(MAGIC_HEADER), sizeof(MAGIC_HEADER));
+ if (ret < 0) {
+ return ret;
+ }
+ if (ret != sizeof(MAGIC_HEADER)) {
+ return -1;
+ }
+ sum += ret;
+ const auto& attributes = flowFile->getAttributes();
+ ret = writeLength(attributes.size(), out);
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ for (const auto& attrIt : attributes) {
+ ret = writeString(attrIt.first, out);
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ ret = writeString(attrIt.second, out);
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ }
+ ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ InputStreamPipe pipe(out);
+ ret = reader_(flowFile, &pipe);
+ if (ret < 0) {
+ return ret;
+ }
+ sum += ret;
+ return sum;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/src/serialization/PayloadSerializer.cpp
similarity index 69%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/src/serialization/PayloadSerializer.cpp
index ab961ad..44b0ed4 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/src/serialization/PayloadSerializer.cpp
@@ -16,32 +16,18 @@
* limitations under the License.
*/
-#pragma once
-
-#include <string>
-
-#include "FlowFileRecord.h"
+#include "serialization/PayloadSerializer.h"
+#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-class WriteCallback : public OutputStreamCallback {
-public:
- explicit WriteCallback(const std::string& data)
- : data_(data) {
- }
-
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
- if (data_.empty())
- return 0;
-
- return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
- }
-
- const std::string& data_;
-};
+int PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
+ InputStreamPipe pipe(out);
+ return reader_(flowFile, &pipe);
+}
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index a2abe0f..c4545ec 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -26,7 +26,7 @@ namespace nifi {
namespace minifi {
namespace utils {
-int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
stream->seek(0);
if (stream->size() > 0) {
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
@@ -37,7 +37,7 @@ int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
return size_.load();
}
-int64_t StreamOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
stream->seek(0);
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
auto written = readFully(buffer.get(), size_);
diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp
index 182cf09..6dfa565 100644
--- a/libminifi/src/utils/FileOutputCallback.cpp
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -26,7 +26,7 @@ namespace nifi {
namespace minifi {
namespace utils {
-int64_t FileOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
if (stream->size() > 0) {
file_stream_.write(reinterpret_cast<char*>(const_cast<uint8_t*>(stream->getBuffer())), stream->size());
size_ += stream->size();
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index caa0aa8..783c95b 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -42,7 +42,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
return total_read;
}
- int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
return write(*stream.get(), stream->size());
}
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 94623b5..cd9c08c 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -56,7 +56,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
if (archive_buffer_)
delete[] archive_buffer_;
}
- int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
int64_t total_read = 0;
int64_t ret = 0;
do {
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 3b1f68a..4f3bdc0 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -36,6 +36,8 @@
#include "processors/LogAttribute.h"
#include "../TestBase.h"
#include "../unit/ProvenanceTestHelper.h"
+#include "serialization/FlowFileV3Serializer.h"
+#include "serialization/PayloadSerializer.h"
std::string FLOW_FILE;
std::string EXPECT_MERGE_CONTENT_FIRST;
@@ -93,7 +95,7 @@ class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
} while (size_ != capacity_);
return total_read;
}
- int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
return write(*stream.get(), capacity_);
}
@@ -845,3 +847,98 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Un
REQUIRE(attributes["tagCommon"] == "common");
REQUIRE(attributes["mime.type"] == "application/tar");
}
+
+void writeString(const std::string& str, const std::shared_ptr<minifi::io::BaseStream>& out) {
+ out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length());
+}
+
+TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
+ MergeTestController testController;
+ auto context = testController.context;
+ auto processor = testController.processor;
+ auto input = testController.input;
+ auto output = testController.output;
+
+ std::string header = "BEGIN{";
+ std::string footer = "}END";
+ std::string demarcator = "_";
+
+ core::ProcessSession session(context);
+
+ minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+ return session.read(ff, cb);
+ });
+ minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+ return session.read(ff, cb);
+ });
+
+ minifi::FlowFileSerializer* usedSerializer;
+
+ std::vector<std::shared_ptr<core::FlowFile>> files;
+
+ for (const auto& content : std::vector<std::string>{"first ff content", "second ff content", "some other data"}) {
+ minifi::io::BufferStream contentStream{reinterpret_cast<const uint8_t*>(content.data()), static_cast<unsigned>(content.length())};
+ auto ff = session.create();
+ ff->removeAttribute(core::SpecialFlowAttribute::FILENAME);
+ ff->addAttribute("one", "banana");
+ ff->addAttribute("two", "seven");
+ session.importFrom(contentStream, ff);
+ session.flushContent();
+ files.push_back(ff);
+ input->put(ff);
+ }
+
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::Header, header);
+ context->setProperty(processors::MergeContent::Footer, footer);
+ context->setProperty(processors::MergeContent::Demarcator, demarcator);
+ context->setProperty(processors::BinFiles::MinEntries, "3");
+
+ SECTION("Payload Serializer") {
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ usedSerializer = &payloadSerializer;
+ }
+ SECTION("FlowFileV3 Serializer") {
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE);
+ usedSerializer = &ffV3Serializer;
+ // only Binary Concatenation take these into account
+ header = "";
+ demarcator = "";
+ footer = "";
+ }
+
+ auto result = std::make_shared<minifi::io::BufferStream>();
+ writeString(header, result);
+ bool first = true;
+ for (const auto& ff : files) {
+ if (!first) {
+ writeString(demarcator, result);
+ }
+ first = false;
+ usedSerializer->serialize(ff, result);
+ }
+ writeString(footer, result);
+
+ std::string expected{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ for (int i = 0; i < 3; i++) {
+ auto mergeSession = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, mergeSession);
+ mergeSession->commit();
+ }
+
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow = output->poll(expiredFlowRecords);
+
+ REQUIRE(expiredFlowRecords.empty());
+ {
+ FixedBuffer callback(flow->getSize());
+ session.read(flow, &callback);
+ REQUIRE(callback.to_string() == expected);
+ }
+
+ LogTestController::getInstance().reset();
+}
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp
new file mode 100644
index 0000000..d89e335
--- /dev/null
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -0,0 +1,85 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <memory>
+#include <string>
+
+#include "io/BaseStream.h"
+#include "serialization/FlowFileV3Serializer.h"
+#include "serialization/PayloadSerializer.h"
+#include "core/FlowFile.h"
+#include "../TestBase.h"
+
+std::shared_ptr<minifi::FlowFileRecord> createEmptyFlowFile() {
+ auto flowFile = std::make_shared<minifi::FlowFileRecord>();
+ flowFile->removeAttribute(core::SpecialFlowAttribute::FILENAME);
+ return flowFile;
+}
+
+TEST_CASE("Payload Serializer", "[testPayload]") {
+ std::string content = "flowFileContent";
+ auto contentStream = std::make_shared<minifi::io::BufferStream>();
+ contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), content.length());
+
+ auto result = std::make_shared<minifi::io::BufferStream>();
+
+ auto flowFile = createEmptyFlowFile();
+ flowFile->setSize(content.size());
+ flowFile->addAttribute("first", "one");
+ flowFile->addAttribute("second", "two");
+
+ minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
+ return cb->process(contentStream);
+ });
+ serializer.serialize(flowFile, result);
+
+ std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+ REQUIRE(serialized == content);
+}
+
+TEST_CASE("FFv3 Serializer", "[testFFv3]") {
+ std::string content = "flowFileContent";
+ auto contentStream = std::make_shared<minifi::io::BufferStream>();
+ contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), content.length());
+
+ auto result = std::make_shared<minifi::io::BufferStream>();
+
+ auto flowFile = createEmptyFlowFile();
+ flowFile->setSize(content.size());
+ flowFile->addAttribute("first", "one");
+ flowFile->addAttribute("second", "two");
+
+ minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
+ return cb->process(contentStream);
+ });
+ serializer.serialize(flowFile, result);
+
+ std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+ std::string expected = "NiFiFF3";
+ expected += std::string("\x00\x02", 2); // number of attributes
+ expected += std::string("\x00\x05", 2) + "first"; // first key
+ expected += std::string("\x00\x03", 2) + "one"; // first value
+ expected += std::string("\x00\x06", 2) + "second"; // second key
+ expected += std::string("\x00\x03", 2) + "two"; // second value
+ expected += std::string("\x00\x00\x00\x00\x00\x00\x00\x0f", 8) + content; // payload of the flowFile
+
+ REQUIRE(serialized == expected);
+}
+