You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/03/15 17:20:34 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-748 - Nanofi:
add custom C processor example
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b5e47b8 MINIFICPP-748 - Nanofi: add custom C processor example
b5e47b8 is described below
commit b5e47b80fa69d6e36a0e552ec7b6ba86d69b6649
Author: Arpad Boda <ab...@hortonworks.com>
AuthorDate: Tue Mar 5 19:11:04 2019 +0100
MINIFICPP-748 - Nanofi: add custom C processor example
This closes #499.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
nanofi/examples/CMakeLists.txt | 4 ++
nanofi/examples/hash_file.c | 121 +++++++++++++++++++++++++++++++++++++++++
nanofi/src/api/nanofi.cpp | 34 +++++++++---
3 files changed, 151 insertions(+), 8 deletions(-)
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 0b9ccde..a3db4d2 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -67,6 +67,10 @@ target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAG
target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
+add_executable(hash_file hash_file.c)
+
+target_link_libraries(hash_file nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
add_executable(transmit_flow transmit_flow.c)
target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
diff --git a/nanofi/examples/hash_file.c b/nanofi/examples/hash_file.c
new file mode 100644
index 0000000..a3e3311
--- /dev/null
+++ b/nanofi/examples/hash_file.c
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifdef OPENSSL_SUPPORT
+
+#include "api/nanofi.h"
+#include <openssl/md5.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h>
+
+void custom_processor_logic(processor_session * ps, processor_context * ctx) {
+ flow_file_record * ffr = get(ps, ctx);
+ if(ffr == NULL) {
+ return;
+ }
+ uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t));
+
+ get_content(ffr, buffer, ffr->size);
+
+
+ MD5_CTX context;
+ MD5_Init(&context);
+ MD5_Update(&context, buffer, ffr->size);
+
+ free(buffer);
+
+ unsigned char digest[MD5_DIGEST_LENGTH];
+ MD5_Final(digest, &context);
+
+ char md5string[33];
+ for(int i = 0; i < 16; ++i) {
+ sprintf(&md5string[i*2], "%02x", (unsigned int)digest[i]);
+ }
+
+ char prop_value[50];
+
+ if(get_property(ctx, "checksum_attr_name", prop_value, 50) != 0) {
+ return; // Attr name not found
+ }
+
+ add_attribute(ffr, prop_value, (void*)md5string, strlen(md5string));
+
+ transfer_to_relationship(ffr, ps, SUCCESS_RELATIONSHIP);
+
+ free_flowfile(ffr);
+}
+
+int main(int argc, char **argv) {
+
+ if (argc < 2) {
+ printf("Error: must run ./hash_file <file>\n");
+ exit(1);
+ }
+
+ char *file = argv[1];
+
+ if(access( file, F_OK ) == -1) {
+ printf("Error: %s doesn't exist!\n", file);
+ exit(1);
+ }
+
+ struct stat stats;
+ stat(file, &stats);
+
+ // Check for file existence
+ if (S_ISDIR(stats.st_mode)){
+ printf("Error: %s is a directory!\n", file);
+ exit(1);
+ }
+
+ add_custom_processor("md5proc", custom_processor_logic);
+
+ standalone_processor *standalone_proc = create_processor("md5proc");
+
+ const char * checksum_attr_name = "md5attr";
+
+ set_standalone_property(standalone_proc, "checksum_attr_name", checksum_attr_name);
+
+
+ flow_file_record * ffr = create_flowfile(file, strlen(file));
+
+ flow_file_record * new_ff = invoke_ff(standalone_proc, ffr);
+
+ free_flowfile(ffr);
+
+ attribute attr;
+ attr.key = checksum_attr_name;
+ attr.value_size = 0;
+ get_attribute(new_ff, &attr);
+
+ char md5value[50];
+
+ snprintf(md5value, attr.value_size, "%s", (const char*)attr.value+1);
+
+ printf("Checksum of %s is %s\n", file, md5value);
+
+ free_flowfile(new_ff);
+
+ free_standalone_processor(standalone_proc);
+
+ return 0;
+}
+
+#endif //OPENSSL_SUPPORT
\ No newline at end of file
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 756544e..a7a37df 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -371,9 +371,18 @@ int8_t remove_attribute(flow_file_record *ff, const char *key) {
int get_content(const flow_file_record* ff, uint8_t* target, int size) {
NULL_CHECK(0, ff, target);
auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo);
- auto stream = (*content_repo)->read(claim);
- return stream->read(target, size);
+ if(ff->crp && (*content_repo)) {
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
+ *content_repo);
+ auto stream = (*content_repo)->read(claim);
+ return stream->read(target, size);
+ } else {
+ file_buffer fb = file_to_buffer(ff->contentLocation);
+ size_t copy_size = size < fb.file_len ? size : fb.file_len;
+ memcpy(target, fb.buffer, copy_size*sizeof(uint8_t));
+ free(fb.buffer);
+ return copy_size;
+ }
}
/**
@@ -414,6 +423,7 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
file_buffer fb = file_to_buffer(ff->contentLocation);
stream = std::make_shared<minifi::io::DataStream>();
stream->writeData(fb.buffer, fb.file_len);
+ free(fb.buffer);
}
} else {
//The flowfile has no content - create an empty stream to create empty content
@@ -606,13 +616,21 @@ flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *
plan->reset();
if (input_ff) {
- auto ff_data = std::make_shared<flowfile_input_params>();
auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository> *>(input_ff->crp);
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
- *content_repo);
- ff_data->content_stream = (*content_repo)->read(claim);
- ff_data->attributes = *static_cast<std::map<std::string, std::string> *>(input_ff->attributes);
+ auto ff_data = std::make_shared<flowfile_input_params>();
+ if(input_ff->crp && (*content_repo)) {
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
+ *content_repo);
+ ff_data->content_stream = (*content_repo)->read(claim);
+ } else {
+ ff_data->content_stream = std::make_shared<minifi::io::DataStream>();
+ file_buffer fb = file_to_buffer(input_ff->contentLocation);
+ ff_data->content_stream->writeData(fb.buffer, fb.file_len);
+ free(fb.buffer);
+ }
+
+ ff_data->attributes = *static_cast<std::map<std::string, std::string> *>(input_ff->attributes);
plan->runNextProcessor(nullptr, ff_data);
}
while (plan->runNextProcessor()) {